diff --git a/model/metric.go b/model/metric.go index 0cdae9262..c254f3d92 100644 --- a/model/metric.go +++ b/model/metric.go @@ -102,6 +102,18 @@ func (v Values) Swap(i, j int) { v[i], v[j] = v[j], v[i] } +// FirstTimeAfter indicates whether the first sample of a set is after a given +// timestamp. +func (v Values) FirstTimeAfter(t time.Time) bool { + return v[0].Timestamp.After(t) +} + +// LastTimeBefore indicates whether the last sample of a set is before a given +// timestamp. +func (v Values) LastTimeBefore(t time.Time) bool { + return v[len(v)-1].Timestamp.Before(t) +} + // InsideInterval indicates whether a given range of sorted values could contain // a value for a given time. func (v Values) InsideInterval(t time.Time) (s bool) { diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 7dc89f3a0..1ba59cc2e 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -289,7 +289,7 @@ func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval t if sampleSets[groupingKey] == nil { sampleSets[groupingKey] = &model.SampleSet{ Metric: sample.Metric, - Values: []model.SamplePair{samplePair}, + Values: model.Values{samplePair}, } } else { sampleSets[groupingKey].Values = append(sampleSets[groupingKey].Values, samplePair) diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 6ea72ad07..a687e636e 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -57,7 +57,7 @@ func interpolateSamples(first, second *model.SamplePair, timestamp time.Time) *m // surrounding a given target time. If samples are found both before and after // the target time, the sample value is interpolated between these. Otherwise, // the single closest sample is returned verbatim. -func (v *viewAdapter) chooseClosestSample(samples []model.SamplePair, timestamp time.Time) (sample *model.SamplePair) { +func (v *viewAdapter) chooseClosestSample(samples model.Values, timestamp time.Time) (sample *model.SamplePair) { var closestBefore *model.SamplePair var closestAfter *model.SamplePair for _, candidate := range samples { diff --git a/rules/testdata.go b/rules/testdata.go index 1fb833814..e14234aa3 100644 --- a/rules/testdata.go +++ b/rules/testdata.go @@ -25,7 +25,7 @@ var testStartTime = time.Time{} func getTestValueStream(startVal model.SampleValue, endVal model.SampleValue, - stepVal model.SampleValue) (resultValues []model.SamplePair) { + stepVal model.SampleValue) (resultValues model.Values) { currentTime := testStartTime for currentVal := startVal; currentVal <= endVal; currentVal += stepVal { sample := model.SamplePair{ diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 757cd19db..1e49a5407 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -47,9 +47,9 @@ type MetricPersistence interface { // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error) - GetValueAtTime(model.Fingerprint, time.Time) []model.SamplePair - GetBoundaryValues(model.Fingerprint, model.Interval) (first []model.SamplePair, second []model.SamplePair) - GetRangeValues(model.Fingerprint, model.Interval) []model.SamplePair + GetValueAtTime(model.Fingerprint, time.Time) model.Values + GetBoundaryValues(model.Fingerprint, model.Interval) (first model.Values, second model.Values) + GetRangeValues(model.Fingerprint, model.Interval) model.Values ForEachSample(IteratorsForFingerprintBuilder) (err error) @@ -64,9 +64,9 @@ type MetricPersistence interface { // View provides view of the values in the datastore subject to the request of a // preloading operation. type View interface { - GetValueAtTime(model.Fingerprint, time.Time) []model.SamplePair - GetBoundaryValues(model.Fingerprint, model.Interval) (first []model.SamplePair, second []model.SamplePair) - GetRangeValues(model.Fingerprint, model.Interval) []model.SamplePair + GetValueAtTime(model.Fingerprint, time.Time) model.Values + GetBoundaryValues(model.Fingerprint, model.Interval) (first model.Values, second model.Values) + GetRangeValues(model.Fingerprint, model.Interval) model.Values // Destroy this view. Close() diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index d3acbf46a..02ebdf18b 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -862,15 +862,15 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) return } -func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { +func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples model.Values) { panic("Not implemented") } -func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { +func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first model.Values, second model.Values) { panic("Not implemented") } -func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { +func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples model.Values) { panic("Not implemented") } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 08c80c84b..e577f1035 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -210,7 +210,7 @@ func (s memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (metri return } -func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { +func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples model.Values) { series, ok := s.fingerprintToSeries[f] if !ok { return @@ -251,13 +251,13 @@ func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (s return } -func (s memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { +func (s memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first model.Values, second model.Values) { first = s.GetValueAtTime(f, i.OldestInclusive) second = s.GetValueAtTime(f, i.NewestInclusive) return } -func (s memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { +func (s memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples model.Values) { series, ok := s.fingerprintToSeries[f] if !ok { return diff --git a/storage/metric/operation.go b/storage/metric/operation.go index 127143512..c3a5a5970 100644 --- a/storage/metric/operation.go +++ b/storage/metric/operation.go @@ -26,7 +26,7 @@ type op interface { // The time at which this operation starts. StartsAt() time.Time // Extract samples from stream of values and advance operation time. - ExtractSamples(in []model.SamplePair) (out []model.SamplePair) + ExtractSamples(in model.Values) (out model.Values) // Get current operation time or nil if no subsequent work associated with // this operator remains. CurrentTime() *time.Time @@ -73,7 +73,7 @@ func (g getValuesAtTimeOp) StartsAt() time.Time { return g.time } -func (g *getValuesAtTimeOp) ExtractSamples(in []model.SamplePair) (out []model.SamplePair) { +func (g *getValuesAtTimeOp) ExtractSamples(in model.Values) (out model.Values) { if len(in) == 0 { return } @@ -100,7 +100,7 @@ func (g getValuesAtTimeOp) GreedierThan(op op) (superior bool) { // are adjacent to it. // // An assumption of this is that the provided samples are already sorted! -func extractValuesAroundTime(t time.Time, in []model.SamplePair) (out []model.SamplePair) { +func extractValuesAroundTime(t time.Time, in model.Values) (out model.Values) { i := sort.Search(len(in), func(i int) bool { return !in[i].Timestamp.Before(t) }) @@ -151,7 +151,7 @@ func (g getValuesAtIntervalOp) Through() time.Time { return g.through } -func (g *getValuesAtIntervalOp) ExtractSamples(in []model.SamplePair) (out []model.SamplePair) { +func (g *getValuesAtIntervalOp) ExtractSamples(in model.Values) (out model.Values) { if len(in) == 0 { return } @@ -206,7 +206,7 @@ func (g getValuesAlongRangeOp) Through() time.Time { return g.through } -func (g *getValuesAlongRangeOp) ExtractSamples(in []model.SamplePair) (out []model.SamplePair) { +func (g *getValuesAlongRangeOp) ExtractSamples(in model.Values) (out model.Values) { if len(in) == 0 { return } diff --git a/storage/metric/operation_test.go b/storage/metric/operation_test.go index 27741f251..6ec5eae2d 100644 --- a/storage/metric/operation_test.go +++ b/storage/metric/operation_test.go @@ -1177,8 +1177,8 @@ func BenchmarkOptimize(b *testing.B) { func TestGetValuesAtTimeOp(t *testing.T) { var scenarios = []struct { op getValuesAtTimeOp - in []model.SamplePair - out []model.SamplePair + in model.Values + out model.Values }{ // No values. { @@ -1191,13 +1191,13 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1209,13 +1209,13 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant.Add(1 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1227,13 +1227,13 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant.Add(2 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1245,7 +1245,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1255,7 +1255,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1267,7 +1267,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant.Add(1 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1277,7 +1277,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1289,7 +1289,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant.Add(90 * time.Second), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1299,7 +1299,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1315,7 +1315,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant.Add(2 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1325,7 +1325,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1341,7 +1341,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { op: getValuesAtTimeOp{ time: testInstant.Add(3 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1351,7 +1351,7 @@ func TestGetValuesAtTimeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(2 * time.Minute), Value: 1, @@ -1376,8 +1376,8 @@ func TestGetValuesAtTimeOp(t *testing.T) { func TestGetValuesAtIntervalOp(t *testing.T) { var scenarios = []struct { op getValuesAtIntervalOp - in []model.SamplePair - out []model.SamplePair + in model.Values + out model.Values }{ // No values. { @@ -1394,7 +1394,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { through: testInstant.Add(1 * time.Minute), interval: 30 * time.Second, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(2 * time.Minute), Value: 1, @@ -1404,7 +1404,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(2 * time.Minute), Value: 1, @@ -1418,7 +1418,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { through: testInstant.Add(2 * time.Minute), interval: 30 * time.Second, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1428,7 +1428,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1446,7 +1446,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { through: testInstant.Add(2 * time.Minute), interval: 30 * time.Second, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant, Value: 1, @@ -1460,7 +1460,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1478,7 +1478,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { through: testInstant.Add(3 * time.Minute), interval: 30 * time.Second, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1488,7 +1488,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1506,7 +1506,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { through: testInstant.Add(4 * time.Minute), interval: 30 * time.Second, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant, Value: 1, @@ -1524,7 +1524,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(2 * time.Minute), Value: 1, @@ -1542,7 +1542,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { through: testInstant.Add(3 * time.Minute), interval: 30 * time.Second, }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant, Value: 1, @@ -1552,7 +1552,7 @@ func TestGetValuesAtIntervalOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1577,8 +1577,8 @@ func TestGetValuesAtIntervalOp(t *testing.T) { func TestGetValuesAlongRangeOp(t *testing.T) { var scenarios = []struct { op getValuesAlongRangeOp - in []model.SamplePair - out []model.SamplePair + in model.Values + out model.Values }{ // No values. { @@ -1593,7 +1593,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { from: testInstant, through: testInstant.Add(1 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(2 * time.Minute), Value: 1, @@ -1603,7 +1603,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{}, + out: model.Values{}, }, // Operator range starts before first value, ends within available values. { @@ -1611,7 +1611,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { from: testInstant, through: testInstant.Add(2 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1621,7 +1621,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1634,7 +1634,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { from: testInstant.Add(1 * time.Minute), through: testInstant.Add(2 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant, Value: 1, @@ -1648,7 +1648,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1661,7 +1661,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { from: testInstant, through: testInstant.Add(3 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1671,7 +1671,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(1 * time.Minute), Value: 1, @@ -1688,7 +1688,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { from: testInstant.Add(2 * time.Minute), through: testInstant.Add(4 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant, Value: 1, @@ -1706,7 +1706,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{ + out: model.Values{ { Timestamp: testInstant.Add(2 * time.Minute), Value: 1, @@ -1723,7 +1723,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { from: testInstant.Add(2 * time.Minute), through: testInstant.Add(3 * time.Minute), }, - in: []model.SamplePair{ + in: model.Values{ { Timestamp: testInstant, Value: 1, @@ -1733,7 +1733,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { Value: 1, }, }, - out: []model.SamplePair{}, + out: model.Values{}, }, } for i, scenario := range scenarios { diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 1c22a0ee3..36314847b 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -188,7 +188,7 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste } } -func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples []model.SamplePair, err error) { +func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples model.Values, err error) { begin := time.Now() defer func() { @@ -465,7 +465,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t NewestInclusive: time.Unix(end, 0), } - samples := []model.SamplePair{} + samples := model.Values{} fp := model.NewFingerprintFromMetric(metric) switch persistence := p.(type) { case *LevelDBMetricPersistence: diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index d4c30d24d..9318fe9a3 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -370,19 +370,14 @@ func (t *tieredStorage) renderView(viewJob viewJob) { if err != nil { panic(err) } - if t.diskFrontier == nil { - // Storage still empty, return an empty view. - viewJob.output <- view - return - } for _, scanJob := range scans { - seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) - if err != nil { - panic(err) - } - if seriesFrontier == nil { - continue + var seriesFrontier *seriesFrontier = nil + if t.diskFrontier != nil { + seriesFrontier, err = newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator) + if err != nil { + panic(err) + } } standingOps := scanJob.operations @@ -393,25 +388,47 @@ func (t *tieredStorage) renderView(viewJob viewJob) { } // Load data value chunk(s) around the first standing op's current time. - highWatermark := *standingOps[0].CurrentTime() - // XXX: For earnest performance gains analagous to the benchmarking we - // performed, chunk should only be reloaded if it no longer contains - // the values we're looking for. - // - // To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue. - chunk := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, highWatermark) + targetTime := *standingOps[0].CurrentTime() + + chunk := model.Values{} + memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime) + // If we aimed before the oldest value in memory, load more data from disk. + if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { + // XXX: For earnest performance gains analagous to the benchmarking we + // performed, chunk should only be reloaded if it no longer contains + // the values we're looking for. + // + // To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue. + diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) + + // If we aimed past the newest value on disk, combine it with the next value from memory. + if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { + latestDiskValue := diskValues[len(diskValues)-1 : len(diskValues)] + chunk = append(latestDiskValue, memValues...) + } else { + chunk = diskValues + } + } else { + chunk = memValues + } + + // There's no data at all for this fingerprint, so stop processing ops for it. + if len(chunk) == 0 { + break + } + lastChunkTime := chunk[len(chunk)-1].Timestamp - if lastChunkTime.After(highWatermark) { - highWatermark = lastChunkTime + if lastChunkTime.After(targetTime) { + targetTime = lastChunkTime } // For each op, extract all needed data from the current chunk. - out := []model.SamplePair{} + out := model.Values{} for _, op := range standingOps { - if op.CurrentTime().After(highWatermark) { + if op.CurrentTime().After(targetTime) { break } - for op.CurrentTime() != nil && !op.CurrentTime().After(highWatermark) { + for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { out = op.ExtractSamples(chunk) } } @@ -450,7 +467,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) { return } -func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) { +func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) { var ( targetKey = &dto.SampleKey{ Fingerprint: fingerprint.ToDTO(), diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 070bc4565..7f9ae57e5 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -69,7 +69,7 @@ func buildSamples(from, to time.Time, interval time.Duration, m model.Metric) (v return } -func testMakeView(t test.Tester) { +func testMakeView(t test.Tester, flushToDisk bool) { type in struct { atTime []getValuesAtTimeOp atInterval []getValuesAtIntervalOp @@ -77,9 +77,9 @@ func testMakeView(t test.Tester) { } type out struct { - atTime [][]model.SamplePair - atInterval [][]model.SamplePair - alongRange [][]model.SamplePair + atTime []model.Values + atInterval []model.Values + alongRange []model.Values } var ( instant = time.Date(1984, 3, 30, 0, 0, 0, 0, time.Local) @@ -100,7 +100,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{{}}, + atTime: []model.Values{{}}, }, }, // Single sample, query asks for exact sample time. @@ -120,7 +120,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant, @@ -152,7 +152,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant.Add(time.Second), @@ -179,7 +179,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant, @@ -211,7 +211,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant, @@ -248,7 +248,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant.Add(time.Second), @@ -285,7 +285,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant, @@ -326,7 +326,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant.Add(time.Second * 2), @@ -351,7 +351,7 @@ func testMakeView(t test.Tester) { }, }, out: out{ - atTime: [][]model.SamplePair{ + atTime: []model.Values{ { { Timestamp: instant.Add(time.Second * time.Duration(*leveldbChunkSize/2)), @@ -378,7 +378,11 @@ func testMakeView(t test.Tester) { } } - tiered.Flush() + if flushToDisk { + tiered.Flush() + } else { + tiered.(*tieredStorage).writeMemory() + } requestBuilder := NewViewRequestBuilder() @@ -421,13 +425,23 @@ func testMakeView(t test.Tester) { } } -func TestMakeView(t *testing.T) { - testMakeView(t) +func TestMakeViewFlush(t *testing.T) { + testMakeView(t, true) } -func BenchmarkMakeView(b *testing.B) { +func BenchmarkMakeViewFlush(b *testing.B) { for i := 0; i < b.N; i++ { - testMakeView(b) + testMakeView(b, true) + } +} + +func TestMakeViewNoFlush(t *testing.T) { + testMakeView(t, false) +} + +func BenchmarkMakeViewNoFlush(b *testing.B) { + for i := 0; i < b.N; i++ { + testMakeView(b, false) } }