diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 682f643e6..6ea72ad07 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -26,9 +26,16 @@ var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default stal // (i.e. metric->fingerprint lookups). var queryStorage metric.Storage = nil +// Describes the lenience limits to apply to values from the materialized view. +type StalenessPolicy struct { + // Describes the inclusive limit at which individual points if requested will + // be matched and subject to interpolation. + DeltaAllowance time.Duration +} + type viewAdapter struct { view metric.View - stalenessPolicy *metric.StalenessPolicy + stalenessPolicy StalenessPolicy } // interpolateSamples interpolates a value at a target time between two @@ -165,12 +172,12 @@ func SetStorage(storage metric.Storage) { } func NewViewAdapter(view metric.View) *viewAdapter { - stalenessPolicy := metric.StalenessPolicy{ + stalenessPolicy := StalenessPolicy{ DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, } return &viewAdapter{ view: view, - stalenessPolicy: &stalenessPolicy, + stalenessPolicy: stalenessPolicy, } } diff --git a/storage/metric/end_to_end_test.go b/storage/metric/end_to_end_test.go index 0f581b0ca..737e2ca09 100644 --- a/storage/metric/end_to_end_test.go +++ b/storage/metric/end_to_end_test.go @@ -271,18 +271,17 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) { } time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) - sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{}) - if err != nil { - t.Fatal(err) - } - if sample == nil { - t.Fatal("expected non-nil sample.") + samples := p.GetValueAtTime(fingerprints[0], time) + if len(samples) == 0 { + t.Fatal("expected at least one sample.") } expected := model.SampleValue(i) - if sample.Value != expected { - t.Fatalf("expected %d value, got %d", expected, sample.Value) + for _, sample := range samples { + if sample.Value != expected { + t.Fatalf("expected %d value, got %d", expected, sample.Value) + } } } } @@ -334,18 +333,17 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) { } time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) - sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{}) - if err != nil { - t.Fatal(err) - } - if sample == nil { - t.Fatal("expected non-nil sample.") + samples := p.GetValueAtTime(fingerprints[0], time) + if len(samples) == 0 { + t.Fatal("expected at least one sample.") } expected := model.SampleValue(i) - if sample.Value != expected { - t.Fatalf("expected %d value, got %d", expected, sample.Value) + for _, sample := range samples { + if sample.Value != expected { + t.Fatalf("expected %d value, got %d", expected, sample.Value) + } } } } diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 27358c657..757cd19db 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -47,9 +47,9 @@ type MetricPersistence interface { // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error) - GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error) - GetBoundaryValues(model.Fingerprint, model.Interval, StalenessPolicy) (*model.Sample, *model.Sample, error) - GetRangeValues(model.Fingerprint, model.Interval) (*model.SampleSet, error) + GetValueAtTime(model.Fingerprint, time.Time) []model.SamplePair + GetBoundaryValues(model.Fingerprint, model.Interval) (first []model.SamplePair, second []model.SamplePair) + GetRangeValues(model.Fingerprint, model.Interval) []model.SamplePair ForEachSample(IteratorsForFingerprintBuilder) (err error) @@ -61,13 +61,6 @@ type MetricPersistence interface { // MakeView(builder ViewRequestBuilder, deadline time.Duration) (View, error) } -// Describes the lenience limits for querying the materialized View. -type StalenessPolicy struct { - // Describes the inclusive limit at which individual points if requested will - // be matched and subject to interpolation. - DeltaAllowance time.Duration -} - // View provides view of the values in the datastore subject to the request of a // preloading operation. type View interface { diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index fef8e4e89..d3acbf46a 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -862,322 +862,16 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) return } -func (l *LevelDBMetricPersistence) GetBoundaryValues(fp model.Fingerprint, i model.Interval, s StalenessPolicy) (open *model.Sample, end *model.Sample, err error) { - begin := time.Now() - - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure}) - }() - - // XXX: Maybe we will want to emit incomplete sets? - open, err = l.GetValueAtTime(fp, i.OldestInclusive, s) - if err != nil { - return - } else if open == nil { - return - } - - end, err = l.GetValueAtTime(fp, i.NewestInclusive, s) - if err != nil { - return - } else if end == nil { - open = nil - } - - return +func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { + panic("Not implemented") } -func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { - yDelta := y2 - y1 - xDelta := x2.Sub(x1) - - dDt := yDelta / float32(xDelta) - offset := float32(e.Sub(x1)) - - return y1 + (offset * dDt) +func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { + panic("Not implemented") } -func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) { - begin := time.Now() - - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) - }() - - // TODO: memoize/cache this or change the return type to metric.SamplePair. - m, err := l.GetMetricForFingerprint(fp) - if err != nil { - return - } - - // Candidate for Refactoring - k := &dto.SampleKey{ - Fingerprint: fp.ToDTO(), - Timestamp: indexable.EncodeTime(t), - } - - e, err := coding.NewProtocolBuffer(k).Encode() - if err != nil { - return - } - - iterator := l.metricSamples.NewIterator(true) - defer iterator.Close() - - if !iterator.Seek(e) { - /* - * Two cases for this: - * 1.) Corruption in LevelDB. - * 2.) Key seek after AND outside known range. - * - * Once a LevelDB iterator goes invalid, it cannot be recovered; thusly, - * we need to create a new in order to check if the last value in the - * database is sufficient for our purposes. This is, in all reality, a - * corner case but one that could bring down the system. - */ - iterator = l.metricSamples.NewIterator(true) - defer iterator.Close() - - if !iterator.SeekToLast() { - /* - * For whatever reason, the LevelDB cannot be recovered. - */ - return - } - } - - var ( - firstKey *dto.SampleKey - firstValue *dto.SampleValueSeries - ) - - firstKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - peekAhead := false - - if !fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) { - /* - * This allows us to grab values for metrics if our request time is after - * the last recorded time subject to the staleness policy due to the nuances - * of LevelDB storage: - * - * # Assumptions: - * - K0 < K1 in terms of sorting. - * - T0 < T1 in terms of sorting. - * - * # Data - * - * K0-T0 - * K0-T1 - * K0-T2 - * K1-T0 - * K1-T1 - * - * # Scenario - * K0-T3, which does not exist, is requested. LevelDB will thusly seek to - * K1-T1, when K0-T2 exists as a perfectly good candidate to check subject - * to the provided staleness policy and such. - */ - peekAhead = true - } - - firstTime := indexable.DecodeTime(firstKey.Timestamp) - if t.Before(firstTime) || peekAhead { - if !iterator.Previous() { - /* - * Two cases for this: - * 1.) Corruption in LevelDB. - * 2.) Key seek before AND outside known range. - * - * This is an explicit validation to ensure that if no previous values for - * the series are found, the query aborts. - */ - return - } - - var ( - alternativeKey *dto.SampleKey - alternativeValue *dto.SampleValueSeries - ) - - alternativeKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - if !fingerprintsEqual(alternativeKey.Fingerprint, k.Fingerprint) { - return - } - - /* - * At this point, we found a previous value in the same series in the - * database. LevelDB originally seeked to the subsequent element given - * the key, but we need to consider this adjacency instead. - */ - alternativeTime := indexable.DecodeTime(alternativeKey.Timestamp) - - firstKey = alternativeKey - firstValue = alternativeValue - firstTime = alternativeTime - } - - firstDelta := firstTime.Sub(t) - if firstDelta < 0 { - firstDelta *= -1 - } - if firstDelta > s.DeltaAllowance { - return - } - - firstValue, err = extractSampleValues(iterator) - if err != nil { - return - } - - sample = model.SampleFromDTO(m, &t, firstValue) - - if firstDelta == time.Duration(0) { - return - } - - if !iterator.Next() { - /* - * Two cases for this: - * 1.) Corruption in LevelDB. - * 2.) Key seek after AND outside known range. - * - * This means that there are no more values left in the storage; and if this - * point is reached, we know that the one that has been found is within the - * allowed staleness limits. - */ - return - } - - var secondKey *dto.SampleKey - - secondKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - if !fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) { - return - } else { - /* - * At this point, current entry in the database has the same key as the - * previous. For this reason, the validation logic will expect that the - * distance between the two points shall not exceed the staleness policy - * allowed limit to reduce interpolation errors. - * - * For this reason, the sample is reset in case of other subsequent - * validation behaviors. - */ - sample = nil - } - - secondTime := indexable.DecodeTime(secondKey.Timestamp) - - totalDelta := secondTime.Sub(firstTime) - if totalDelta > s.DeltaAllowance { - return - } - - var secondValue *dto.SampleValueSeries - - secondValue, err = extractSampleValues(iterator) - if err != nil { - return - } - - fValue := *firstValue.Value[0].Value - sValue := *secondValue.Value[0].Value - - interpolated := interpolate(firstTime, secondTime, fValue, sValue, t) - - sampleValue := &dto.SampleValueSeries{} - sampleValue.Value = append(sampleValue.Value, &dto.SampleValueSeries_Value{Value: &interpolated}) - - sample = model.SampleFromDTO(m, &t, sampleValue) - - return -} - -func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.Interval) (v *model.SampleSet, err error) { - begin := time.Now() - - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) - }() - - k := &dto.SampleKey{ - Fingerprint: fp.ToDTO(), - Timestamp: indexable.EncodeTime(i.OldestInclusive), - } - - e, err := coding.NewProtocolBuffer(k).Encode() - if err != nil { - return - } - - iterator := l.metricSamples.NewIterator(true) - defer iterator.Close() - - predicate := keyIsOlderThan(i.NewestInclusive) - - for valid := iterator.Seek(e); valid; valid = iterator.Next() { - retrievedKey := &dto.SampleKey{} - - retrievedKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - if predicate(retrievedKey) { - break - } - - if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { - break - } - - retrievedValue, err := extractSampleValues(iterator) - if err != nil { - return nil, err - } - - if v == nil { - // TODO: memoize/cache this or change the return type to metric.SamplePair. - m, err := l.GetMetricForFingerprint(fp) - if err != nil { - return v, err - } - v = &model.SampleSet{ - Metric: *m, - } - } - - v.Values = append(v.Values, model.SamplePair{ - Value: model.SampleValue(*retrievedValue.Value[0].Value), - Timestamp: indexable.DecodeTime(retrievedKey.Timestamp), - }) - } - - // XXX: We should not explicitly sort here but rather rely on the datastore. - // This adds appreciable overhead. - if v != nil { - sort.Sort(v.Values) - } - - return +func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { + panic("Not implemented") } type MetricKeyDecoder struct{} diff --git a/storage/metric/memory.go b/storage/metric/memory.go index b62263de3..08c80c84b 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/utility" "github.com/ryszard/goskiplist/skiplist" - "sort" "time" ) @@ -148,6 +147,19 @@ func (s memorySeriesStorage) AppendSample(sample model.Sample) (err error) { return } +// Append raw sample, bypassing indexing. Only used to add data to views, which +// don't need to lookup by metric. +func (s memorySeriesStorage) appendSampleWithoutIndexing(f model.Fingerprint, timestamp time.Time, value model.SampleValue) { + series, ok := s.fingerprintToSeries[f] + + if !ok { + series = newStream(model.Metric{}) + s.fingerprintToSeries[f] = series + } + + series.add(timestamp, value) +} + func (s memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) { sets := []utility.Set{} @@ -198,152 +210,99 @@ func (s memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (metri return } -// XXX: Terrible wart. -func interpolateSample(x1, x2 time.Time, y1, y2 float32, e time.Time) model.SampleValue { - yDelta := y2 - y1 - xDelta := x2.Sub(x1) - - dDt := yDelta / float32(xDelta) - offset := float32(e.Sub(x1)) - - return model.SampleValue(y1 + (offset * dDt)) -} - -func (s memorySeriesStorage) GetValueAtTime(fp model.Fingerprint, t time.Time, p StalenessPolicy) (sample *model.Sample, err error) { - series, ok := s.fingerprintToSeries[fp] +func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { + series, ok := s.fingerprintToSeries[f] if !ok { return } iterator := series.values.Seek(skipListTime(t)) if iterator == nil { + // If the iterator is nil, it means we seeked past the end of the series, + // so we seek to the last value instead. Due to the reverse ordering + // defined on skipListTime, this corresponds to the sample with the + // earliest timestamp. + iterator = series.values.SeekToLast() + if iterator == nil { + // The list is empty. + return + } + } + + defer iterator.Close() + + if iterator.Key() == nil || iterator.Value() == nil { return } foundTime := time.Time(iterator.Key().(skipListTime)) - if foundTime.Equal(t) { - value := iterator.Value().(value) - sample = &model.Sample{ - Metric: series.metric, - Value: value.get(), - Timestamp: t, - } + samples = append(samples, model.SamplePair{ + Timestamp: foundTime, + Value: iterator.Value().(value).get(), + }) - return - } - - if t.Sub(foundTime) > p.DeltaAllowance { - return - } - - secondTime := foundTime - secondValue := iterator.Value().(value).get() - - if !iterator.Previous() { - sample = &model.Sample{ - Metric: series.metric, + if foundTime.Before(t) && iterator.Previous() { + samples = append(samples, model.SamplePair{ + Timestamp: time.Time(iterator.Key().(skipListTime)), Value: iterator.Value().(value).get(), - Timestamp: t, - } - return - } - - firstTime := time.Time(iterator.Key().(skipListTime)) - if t.Sub(firstTime) > p.DeltaAllowance { - return - } - - if firstTime.Sub(secondTime) > p.DeltaAllowance { - return - } - - firstValue := iterator.Value().(value).get() - - sample = &model.Sample{ - Metric: series.metric, - Value: interpolateSample(firstTime, secondTime, float32(firstValue), float32(secondValue), t), - Timestamp: t, + }) } return } -func (s memorySeriesStorage) GetBoundaryValues(fp model.Fingerprint, i model.Interval, p StalenessPolicy) (first *model.Sample, second *model.Sample, err error) { - first, err = s.GetValueAtTime(fp, i.OldestInclusive, p) - if err != nil { - return - } else if first == nil { - return - } - - second, err = s.GetValueAtTime(fp, i.NewestInclusive, p) - if err != nil { - return - } else if second == nil { - first = nil - } - +func (s memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { + first = s.GetValueAtTime(f, i.OldestInclusive) + second = s.GetValueAtTime(f, i.NewestInclusive) return } -func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interval) (samples *model.SampleSet, err error) { - series, ok := s.fingerprintToSeries[fp] +func (s memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { + series, ok := s.fingerprintToSeries[f] if !ok { return } - samples = &model.SampleSet{ - Metric: series.metric, + iterator := series.values.Seek(skipListTime(i.OldestInclusive)) + if iterator == nil { + // If the iterator is nil, it means we seeked past the end of the series, + // so we seek to the last value instead. Due to the reverse ordering + // defined on skipListTime, this corresponds to the sample with the + // earliest timestamp. + iterator = series.values.SeekToLast() + if iterator == nil { + // The list is empty. + return + } } - iterator := series.values.Seek(skipListTime(i.NewestInclusive)) - if iterator == nil { - return - } + defer iterator.Close() for { timestamp := time.Time(iterator.Key().(skipListTime)) - if timestamp.Before(i.OldestInclusive) { + if timestamp.After(i.NewestInclusive) { break } - samples.Values = append(samples.Values, - model.SamplePair{ + if !timestamp.Before(i.OldestInclusive) { + samples = append(samples, model.SamplePair{ Value: iterator.Value().(value).get(), Timestamp: timestamp, }) + } - if !iterator.Next() { + if !iterator.Previous() { break } } - // XXX: We should not explicitly sort here but rather rely on the datastore. - // This adds appreciable overhead. - if samples != nil { - sort.Sort(samples.Values) - } - return } func (s memorySeriesStorage) Close() { - // This can probably be simplified: - // - // s.fingerPrintToSeries = map[model.Fingerprint]*stream{} - // s.labelPairToFingerprints = map[string]model.Fingerprints{} - // s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{} - for fingerprint := range s.fingerprintToSeries { - delete(s.fingerprintToSeries, fingerprint) - } - - for labelPair := range s.labelPairToFingerprints { - delete(s.labelPairToFingerprints, labelPair) - } - - for labelName := range s.labelNameToFingerprints { - delete(s.labelNameToFingerprints, labelName) - } + s.fingerprintToSeries = map[model.Fingerprint]stream{} + s.labelPairToFingerprints = map[string]model.Fingerprints{} + s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{} } func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index 65dcec2ea..c7c6e2fec 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -30,21 +30,18 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer } type input struct { - year int - month time.Month - day int - hour int - staleness time.Duration + year int + month time.Month + day int + hour int } - type output struct { - value model.SampleValue - } + type output []model.SampleValue type behavior struct { name string input input - output *output + output output } var contexts = []struct { @@ -59,11 +56,10 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer { name: "random target", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, }, }, @@ -81,92 +77,39 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer }, behaviors: []behavior{ { - name: "exact without staleness policy", + name: "exact", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "exact with staleness policy", + name: "before", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1984, + month: 3, + day: 29, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "before without staleness policy", + name: "after", input: input{ - year: 1984, - month: 3, - day: 29, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 31, + hour: 0, }, - }, - { - name: "before within staleness policy", - input: input{ - year: 1984, - month: 3, - day: 29, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "before outside staleness policy", - input: input{ - year: 1984, - month: 3, - day: 29, - hour: 0, - staleness: time.Duration(1) * time.Hour, - }, - }, - { - name: "after without staleness policy", - input: input{ - year: 1984, - month: 3, - day: 31, - hour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "after within staleness policy", - input: input{ - year: 1984, - month: 3, - day: 31, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 0, - }, - }, - { - name: "after outside staleness policy", - input: input{ - year: 1984, - month: 4, - day: 7, - hour: 0, - staleness: time.Duration(7*24) * time.Hour, + output: output{ + 0, }, }, }, @@ -191,131 +134,64 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer }, behaviors: []behavior{ { - name: "exact first without staleness policy", + name: "exact first", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "exact first with staleness policy", + name: "exact second", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1985, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 1, }, }, { - name: "exact second without staleness policy", + name: "before first", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1983, + month: 9, + day: 29, + hour: 12, }, - output: &output{ - value: 1, + output: output{ + 0, }, }, { - name: "exact second with staleness policy", + name: "after second", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1985, + month: 9, + day: 28, + hour: 12, }, - output: &output{ - value: 1, + output: output{ + 1, }, }, { - name: "before first without staleness policy", + name: "middle", input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(0), + year: 1984, + month: 9, + day: 28, + hour: 12, }, - }, - { - name: "before first with staleness policy", - input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "after second with staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 1, - }, - }, - { - name: "after second without staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "middle without staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "middle with insufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(364*24) * time.Hour, - }, - }, - { - name: "middle with sufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 0.5, + output: output{ + 0, + 1, }, }, }, @@ -347,200 +223,89 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer }, behaviors: []behavior{ { - name: "exact first without staleness policy", + name: "exact first", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "exact first with staleness policy", + name: "exact second", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1985, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 1, }, }, { - name: "exact second without staleness policy", + name: "exact third", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1986, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 1, + output: output{ + 2, }, }, { - name: "exact second with staleness policy", + name: "before first", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1983, + month: 9, + day: 29, + hour: 12, }, - output: &output{ - value: 1, + output: output{ + 0, }, }, { - name: "exact third without staleness policy", + name: "after third", input: input{ - year: 1986, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1986, + month: 9, + day: 28, + hour: 12, }, - output: &output{ - value: 2, + output: output{ + 2, }, }, { - name: "exact third with staleness policy", + name: "first middle", input: input{ - year: 1986, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1984, + month: 9, + day: 28, + hour: 12, }, - output: &output{ - value: 2, + output: output{ + 0, + 1, }, }, { - name: "before first without staleness policy", + name: "second middle", input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(0), + year: 1985, + month: 9, + day: 28, + hour: 12, }, - }, - { - name: "before first with staleness policy", - input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "after third within staleness policy", - input: input{ - year: 1986, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 2, - }, - }, - { - name: "after third outside staleness policy", - input: input{ - year: 1986, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(1*24) * time.Hour, - }, - }, - { - name: "after third without staleness policy", - input: input{ - year: 1986, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "first middle without staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "first middle with insufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(364*24) * time.Hour, - }, - }, - { - name: "first middle with sufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 0.5, - }, - }, - { - name: "second middle without staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "second middle with insufficient staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(364*24) * time.Hour, - }, - }, - { - name: "second middle with sufficient staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 1.5, + output: output{ + 1, + 2, }, }, }, @@ -570,27 +335,16 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer for j, behavior := range context.behaviors { input := behavior.input time := time.Date(input.year, input.month, input.day, input.hour, 0, 0, 0, time.UTC) - sp := StalenessPolicy{ - DeltaAllowance: input.staleness, + + actual := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time) + + if len(behavior.output) != len(actual) { + t.Fatalf("%d.%d(%s). Expected %d samples but got: %v\n", i, j, behavior.name, len(behavior.output), actual) } + for k, samplePair := range actual { + if samplePair.Value != behavior.output[k] { + t.Fatalf("%d.%d.%d(%s). Expected %s but got %s\n", i, j, k, behavior.name, behavior.output[k], samplePair) - actual, err := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time, sp) - if err != nil { - t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) - } - - if behavior.output == nil { - if actual != nil { - t.Fatalf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, actual) - } - } else { - if actual == nil { - t.Fatalf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output) - } else { - if actual.Value != behavior.output.value { - t.Fatalf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output, actual) - - } } } } @@ -616,18 +370,17 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth time.Month endDay int endHour int - staleness time.Duration } type output struct { - open model.SampleValue - end model.SampleValue + open []model.SampleValue + end []model.SampleValue } type behavior struct { name string input input - output *output + output output } var contexts = []struct { @@ -640,7 +393,7 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo values: []value{}, behaviors: []behavior{ { - name: "non-existent interval without staleness policy", + name: "non-existent interval", input: input{ openYear: 1984, openMonth: 3, @@ -650,21 +403,6 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "non-existent interval with staleness policy", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1985, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, }, }, }, @@ -682,7 +420,7 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo }, behaviors: []behavior{ { - name: "on start but missing end without staleness policy", + name: "on start but missing end", input: input{ openYear: 1984, openMonth: 3, @@ -692,11 +430,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "non-existent interval after within staleness policy", + name: "non-existent interval after", input: input{ openYear: 1984, openMonth: 3, @@ -706,25 +447,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(4380) * time.Hour, + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "non-existent interval after without staleness policy", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 31, - openHour: 0, - endYear: 1985, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "non-existent interval before with staleness policy", + name: "non-existent interval before", input: input{ openYear: 1983, openMonth: 3, @@ -734,25 +464,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 29, endHour: 0, - staleness: time.Duration(365*24) * time.Hour, + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "non-existent interval before without staleness policy", - input: input{ - openYear: 1983, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1984, - endMonth: 3, - endDay: 29, - endHour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "on end but not start without staleness policy", + name: "on end but not start", input: input{ openYear: 1983, openMonth: 3, @@ -762,25 +481,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "on end but not start without staleness policy", - input: input{ - openYear: 1983, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1984, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "before point without staleness policy", + name: "before point", input: input{ openYear: 1982, openMonth: 3, @@ -790,25 +498,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "before point with staleness policy", - input: input{ - openYear: 1982, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1983, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "after point without staleness policy", + name: "after point", input: input{ openYear: 1985, openMonth: 3, @@ -818,25 +515,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "after point with staleness policy", - input: input{ - openYear: 1985, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1986, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "spanning point without staleness policy", + name: "spanning point", input: input{ openYear: 1983, openMonth: 9, @@ -846,21 +532,10 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 9, endDay: 28, endHour: 12, - staleness: time.Duration(0), }, - }, - { - name: "spanning point with staleness policy", - input: input{ - openYear: 1983, - openMonth: 9, - openDay: 29, - openHour: 12, - endYear: 1984, - endMonth: 9, - endDay: 28, - endHour: 12, - staleness: time.Duration(365*24) * time.Hour, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, }, @@ -885,7 +560,7 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo }, behaviors: []behavior{ { - name: "on points without staleness policy", + name: "on points", input: input{ openYear: 1984, openMonth: 3, @@ -895,33 +570,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), }, - output: &output{ - open: 0, - end: 1, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{1}, }, }, { - name: "on points with staleness policy", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1985, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - open: 0, - end: 1, - }, - }, - { - name: "on first before second outside of staleness", + name: "on first before second", input: input{ openYear: 1984, openMonth: 3, @@ -931,25 +587,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 6, endDay: 29, endHour: 6, - staleness: time.Duration(2190) * time.Hour, + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0, 1}, }, }, { - name: "on first before second within staleness", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1984, - endMonth: 6, - endDay: 29, - endHour: 6, - staleness: time.Duration(356*24) * time.Hour, - }, - }, - { - name: "on first after second outside of staleness", + name: "on first after second", input: input{ openYear: 1984, openMonth: 3, @@ -959,25 +604,10 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 6, endDay: 29, endHour: 6, - staleness: time.Duration(1) * time.Hour, }, - }, - { - name: "on first after second within staleness", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1985, - endMonth: 6, - endDay: 29, - endHour: 6, - staleness: time.Duration(356*24) * time.Hour, - }, - output: &output{ - open: 0, - end: 1, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{1}, }, }, }, @@ -1012,35 +642,23 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo OldestInclusive: open, NewestInclusive: end, } - po := StalenessPolicy{ - DeltaAllowance: input.staleness, + openValues, endValues := p.GetBoundaryValues(model.NewFingerprintFromMetric(m), interval) + if len(behavior.output.open) != len(openValues) { + t.Fatalf("%d.%d(%s). Expected %d open values but got: %q\n", i, j, behavior.name, len(behavior.output.open), openValues) + } + if len(behavior.output.end) != len(endValues) { + t.Fatalf("%d.%d(%s). Expected %d open values but got: %q\n", i, j, behavior.name, len(behavior.output.open), openValues) } - openValue, endValue, err := p.GetBoundaryValues(model.NewFingerprintFromMetric(m), interval, po) - if err != nil { - t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) + for k, samplePair := range openValues { + if samplePair.Value != behavior.output.open[k] { + t.Fatalf("%d.%d.%d(%s). Expected open to be %v but got %v\n", i, j, k, behavior.name, behavior.output.open[k], samplePair.Value) + } } - if behavior.output == nil { - if openValue != nil { - t.Fatalf("%d.%d(%s). Expected open to be nil but got: %q\n", i, j, behavior.name, openValue) - } - if endValue != nil { - t.Fatalf("%d.%d(%s). Expected end to be nil but got: %q\n", i, j, behavior.name, endValue) - } - } else { - if openValue == nil { - t.Fatalf("%d.%d(%s). Expected open to be %s but got nil\n", i, j, behavior.name, behavior.output) - } - if endValue == nil { - t.Fatalf("%d.%d(%s). Expected end to be %s but got nil\n", i, j, behavior.name, behavior.output) - } - if openValue.Value != behavior.output.open { - t.Fatalf("%d.%d(%s). Expected open to be %s but got %s\n", i, j, behavior.name, behavior.output.open, openValue.Value) - } - - if endValue.Value != behavior.output.end { - t.Fatalf("%d.%d(%s). Expected end to be %s but got %s\n", i, j, behavior.name, behavior.output.end, endValue.Value) + for k, samplePair := range endValues { + if samplePair.Value != behavior.output.end[k] { + t.Fatalf("%d.%d.%d(%s). Expected end to be %v but got %v\n", i, j, k, behavior.name, behavior.output.end[k], samplePair.Value) } } } @@ -1362,10 +980,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer NewestInclusive: end, } - values, err := p.GetRangeValues(model.NewFingerprintFromMetric(m), in) - if err != nil { - t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) - } + values := p.GetRangeValues(model.NewFingerprintFromMetric(m), in) if values == nil && len(behavior.output) != 0 { t.Fatalf("%d.%d(%s). Expected %s but got: %s\n", i, j, behavior.name, behavior.output, values) @@ -1376,11 +991,11 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer t.Fatalf("%d.%d(%s). Expected nil values but got: %s\n", i, j, behavior.name, values) } } else { - if len(behavior.output) != len(values.Values) { - t.Fatalf("%d.%d(%s). Expected length %d but got: %d\n", i, j, behavior.name, len(behavior.output), len(values.Values)) + if len(behavior.output) != len(values) { + t.Fatalf("%d.%d(%s). Expected length %d but got: %d\n", i, j, behavior.name, len(behavior.output), len(values)) } - for k, actual := range values.Values { + for k, actual := range values { expected := behavior.output[k] if actual.Value != model.SampleValue(expected.value) { @@ -1412,53 +1027,6 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer // Test Definitions Follow -func testLevelDBGetValueAtTime(t test.Tester) { - persistenceMaker := buildLevelDBTestPersistencesMaker("get_value_at_time", t) - GetValueAtTimeTests(persistenceMaker, t) -} - -func TestLevelDBGetValueAtTime(t *testing.T) { - testLevelDBGetValueAtTime(t) -} - -func BenchmarkLevelDBGetValueAtTime(b *testing.B) { - for i := 0; i < b.N; i++ { - testLevelDBGetValueAtTime(b) - } -} - -func testLevelDBGetBoundaryValues(t test.Tester) { - persistenceMaker := buildLevelDBTestPersistencesMaker("get_boundary_values", t) - - GetBoundaryValuesTests(persistenceMaker, t) -} - -func TestLevelDBGetBoundaryValues(t *testing.T) { - testLevelDBGetBoundaryValues(t) -} - -func BenchmarkLevelDBGetBoundaryValues(b *testing.B) { - for i := 0; i < b.N; i++ { - testLevelDBGetBoundaryValues(b) - } -} - -func testLevelDBGetRangeValues(t test.Tester) { - persistenceMaker := buildLevelDBTestPersistencesMaker("get_range_values", t) - - GetRangeValuesTests(persistenceMaker, t) -} - -func TestLevelDBGetRangeValues(t *testing.T) { - testLevelDBGetRangeValues(t) -} - -func BenchmarkLevelDBGetRangeValues(b *testing.B) { - for i := 0; i < b.N; i++ { - testLevelDBGetRangeValues(b) - } -} - func testMemoryGetValueAtTime(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { return NewMemorySeriesStorage(), test.NilCloser diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 6045da867..1c22a0ee3 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -15,7 +15,10 @@ package metric import ( "fmt" + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" + dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/utility/test" "math" "math/rand" @@ -185,6 +188,60 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste } } +func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples []model.SamplePair, err error) { + begin := time.Now() + + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) + }() + + k := &dto.SampleKey{ + Fingerprint: fp.ToDTO(), + Timestamp: indexable.EncodeTime(i.OldestInclusive), + } + + e, err := coding.NewProtocolBuffer(k).Encode() + if err != nil { + return + } + + iterator := l.metricSamples.NewIterator(true) + defer iterator.Close() + + predicate := keyIsOlderThan(i.NewestInclusive) + + for valid := iterator.Seek(e); valid; valid = iterator.Next() { + retrievedKey := &dto.SampleKey{} + + retrievedKey, err = extractSampleKey(iterator) + if err != nil { + return + } + + if predicate(retrievedKey) { + break + } + + if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { + break + } + + retrievedValue, err := extractSampleValues(iterator) + if err != nil { + return nil, err + } + + samples = append(samples, model.SamplePair{ + Value: model.SampleValue(*retrievedValue.Value[0].Value), + Timestamp: indexable.DecodeTime(retrievedKey.Timestamp), + }) + } + + return +} + func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) { stochastic := func(x int) (success bool) { p, closer := persistenceMaker() @@ -408,14 +465,22 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t NewestInclusive: time.Unix(end, 0), } - samples, err := p.GetRangeValues(model.NewFingerprintFromMetric(metric), interval) - if err != nil { - t.Error(err) - return + samples := []model.SamplePair{} + fp := model.NewFingerprintFromMetric(metric) + switch persistence := p.(type) { + case *LevelDBMetricPersistence: + var err error + samples, err = levelDBGetRangeValues(persistence, fp, interval) + if err != nil { + t.Fatal(err) + return + } + default: + samples = p.GetRangeValues(fp, interval) } - if len(samples.Values) < 2 { - t.Errorf("expected sample count less than %d, got %d", 2, len(samples.Values)) + if len(samples) < 2 { + t.Errorf("expected sample count less than %d, got %d", 2, len(samples)) return } } diff --git a/storage/metric/view.go b/storage/metric/view.go index 3027328eb..79fd2cd88 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -15,7 +15,6 @@ package metric import ( "github.com/prometheus/prometheus/model" - "github.com/ryszard/goskiplist/skiplist" "sort" "time" ) @@ -102,128 +101,13 @@ func (v viewRequestBuilder) ScanJobs() (j scanJobs) { } type view struct { - fingerprintToSeries map[model.Fingerprint]viewStream + memorySeriesStorage } func (v view) appendSample(fingerprint model.Fingerprint, timestamp time.Time, value model.SampleValue) { - var ( - series, ok = v.fingerprintToSeries[fingerprint] - ) - - if !ok { - series = newViewStream() - v.fingerprintToSeries[fingerprint] = series - } - - series.add(timestamp, value) -} - -func (v view) Close() { - v.fingerprintToSeries = make(map[model.Fingerprint]viewStream) -} - -func (v view) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { - series, ok := v.fingerprintToSeries[f] - if !ok { - return - } - - iterator := series.values.Seek(skipListTime(t)) - if iterator == nil { - // If the iterator is nil, it means we seeked past the end of the series, - // so we seek to the last value instead. Due to the reverse ordering - // defined on skipListTime, this corresponds to the sample with the - // earliest timestamp. - iterator = series.values.SeekToLast() - if iterator == nil { - // The list is empty. - return - } - } - - defer iterator.Close() - - if iterator.Key() == nil || iterator.Value() == nil { - return - } - - samples = append(samples, model.SamplePair{ - Timestamp: time.Time(iterator.Key().(skipListTime)), - Value: iterator.Value().(value).get(), - }) - - if iterator.Previous() { - samples = append(samples, model.SamplePair{ - Timestamp: time.Time(iterator.Key().(skipListTime)), - Value: iterator.Value().(value).get(), - }) - } - - return -} - -func (v view) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { - first = v.GetValueAtTime(f, i.OldestInclusive) - second = v.GetValueAtTime(f, i.NewestInclusive) - return -} - -func (v view) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { - series, ok := v.fingerprintToSeries[f] - if !ok { - return - } - - iterator := series.values.Seek(skipListTime(i.OldestInclusive)) - if iterator == nil { - // If the iterator is nil, it means we seeked past the end of the series, - // so we seek to the last value instead. Due to the reverse ordering - // defined on skipListTime, this corresponds to the sample with the - // earliest timestamp. - iterator = series.values.SeekToLast() - if iterator == nil { - // The list is empty. - return - } - } - - for { - timestamp := time.Time(iterator.Key().(skipListTime)) - if timestamp.After(i.NewestInclusive) { - break - } - - if !timestamp.Before(i.OldestInclusive) { - samples = append(samples, model.SamplePair{ - Value: iterator.Value().(value).get(), - Timestamp: timestamp, - }) - } - - if !iterator.Previous() { - break - } - } - - return + v.appendSampleWithoutIndexing(fingerprint, timestamp, value) } func newView() view { - return view{ - fingerprintToSeries: make(map[model.Fingerprint]viewStream), - } -} - -type viewStream struct { - values *skiplist.SkipList -} - -func (s viewStream) add(timestamp time.Time, value model.SampleValue) { - s.values.Set(skipListTime(timestamp), singletonValue(value)) -} - -func newViewStream() viewStream { - return viewStream{ - values: skiplist.New(), - } + return view{NewMemorySeriesStorage()} } diff --git a/storage/metric/view_test.go b/storage/metric/view_test.go index 333b628e1..5335c3b06 100644 --- a/storage/metric/view_test.go +++ b/storage/metric/view_test.go @@ -54,7 +54,7 @@ func testBuilder(t test.Tester) { in in out out }{ - // // Ensure that the fingerprint is sorted in proper order. + // Ensure that the fingerprint is sorted in proper order. { in: in{ atTimes: []atTime{