diff --git a/promql/analyzer.go b/promql/analyzer.go index a10656e1a1..6e052656da 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -125,26 +125,45 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { }() // Preload all analyzed ranges. + iters := map[time.Duration]map[model.Fingerprint]local.SeriesIterator{} for offset, pt := range a.offsetPreloadTimes { + itersForDuration := map[model.Fingerprint]local.SeriesIterator{} + iters[offset] = itersForDuration start := a.Start.Add(-offset) end := a.End.Add(-offset) for fp, rangeDuration := range pt.ranges { if err = contextDone(ctx, env); err != nil { return nil, err } - err = p.PreloadRange(fp, start.Add(-rangeDuration), end, StalenessDelta) + startOfRange := start.Add(-rangeDuration) + if StalenessDelta > rangeDuration { + // Cover a weird corner case: The expression + // mixes up instants and ranges for the same + // series. We'll handle that over-all as + // range. But if the rangeDuration is smaller + // than the StalenessDelta, the range wouldn't + // cover everything potentially needed for the + // instant, so we have to extend startOfRange. + startOfRange = start.Add(-StalenessDelta) + } + iter, err := p.PreloadRange(fp, startOfRange, end) if err != nil { return nil, err } + itersForDuration[fp] = iter } for fp := range pt.instants { if err = contextDone(ctx, env); err != nil { return nil, err } - err = p.PreloadRange(fp, start, end, StalenessDelta) + // Need to look backwards by StalenessDelta but not + // forward because we always return the closest sample + // _before_ the reference time. + iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end) if err != nil { return nil, err } + itersForDuration[fp] = iter } } @@ -153,11 +172,11 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { switch n := node.(type) { case *VectorSelector: for fp := range n.metrics { - n.iterators[fp] = a.Storage.NewIterator(fp) + n.iterators[fp] = iters[n.Offset][fp] } case *MatrixSelector: for fp := range n.metrics { - n.iterators[fp] = a.Storage.NewIterator(fp) + n.iterators[fp] = iters[n.Offset][fp] } } return true diff --git a/promql/engine.go b/promql/engine.go index 1927a41ab0..599486cae4 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -688,15 +688,16 @@ func (ev *evaluator) eval(expr Expr) model.Value { func (ev *evaluator) vectorSelector(node *VectorSelector) vector { vec := vector{} for fp, it := range node.iterators { - sampleCandidates := it.ValueAtTime(ev.Timestamp.Add(-node.Offset)) - samplePair := chooseClosestBefore(sampleCandidates, ev.Timestamp.Add(-node.Offset)) - if samplePair != nil { - vec = append(vec, &sample{ - Metric: node.metrics[fp], - Value: samplePair.Value, - Timestamp: ev.Timestamp, - }) + refTime := ev.Timestamp.Add(-node.Offset) + samplePair := it.ValueAtOrBeforeTime(refTime) + if samplePair.Timestamp.Before(refTime.Add(-StalenessDelta)) { + continue // Sample outside of staleness policy window. } + vec = append(vec, &sample{ + Metric: node.metrics[fp], + Value: samplePair.Value, + Timestamp: ev.Timestamp, + }) } return vec } @@ -1168,23 +1169,6 @@ func shouldDropMetricName(op itemType) bool { // series is considered stale. var StalenessDelta = 5 * time.Minute -// chooseClosestBefore chooses the closest sample of a list of samples -// before or at a given target time. -func chooseClosestBefore(samples []model.SamplePair, timestamp model.Time) *model.SamplePair { - for _, candidate := range samples { - delta := candidate.Timestamp.Sub(timestamp) - // Samples before or at target time. - if delta <= 0 { - // Ignore samples outside of staleness policy window. - if -delta > StalenessDelta { - continue - } - return &candidate - } - } - return nil -} - // A queryGate controls the maximum number of concurrently running and waiting queries. type queryGate struct { ch chan struct{} diff --git a/promql/functions.go b/promql/functions.go index 09cde741c2..4728b1e96e 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -583,6 +583,11 @@ func funcPredictLinear(ev *evaluator, args Expressions) model.Value { } // add predicted delta to last value. + // TODO(beorn7): This is arguably suboptimal. The funcDeriv above has + // given us an estimate over the range. So we should add the delta to + // the value predicted for the end of the range. Also, once this has + // been rectified, we are not using BoundaryValues anywhere anymore, so + // we can kick out a whole lot of code. matrixBounds := ev.evalMatrixBounds(args[0]) outVec := make(vector, 0, len(signatureToDelta)) for _, samples := range matrixBounds { diff --git a/promql/testdata/selectors.test b/promql/testdata/selectors.test new file mode 100644 index 0000000000..f7eee977bc --- /dev/null +++ b/promql/testdata/selectors.test @@ -0,0 +1,37 @@ +load 10s + http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000 + http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000 + http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000 + http_requests{job="api-server", instance="1", group="canary"} 0+40x2000 + +eval instant at 8000s rate(http_requests[1m]) + {job="api-server", instance="0", group="production"} 1 + {job="api-server", instance="1", group="production"} 2 + {job="api-server", instance="0", group="canary"} 3 + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 18000s rate(http_requests[1m]) + {job="api-server", instance="0", group="production"} 3 + {job="api-server", instance="1", group="production"} 3 + {job="api-server", instance="0", group="canary"} 8 + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 8000s rate(http_requests{group=~"pro.*"}[1m]) + {job="api-server", instance="0", group="production"} 1 + {job="api-server", instance="1", group="production"} 2 + +eval instant at 18000s rate(http_requests{group=~".*ry", instance="1"}[1m]) + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 18000s rate(http_requests{instance!="3"}[1m] offset 10000s) + {job="api-server", instance="0", group="production"} 1 + {job="api-server", instance="1", group="production"} 2 + {job="api-server", instance="0", group="canary"} 3 + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 18000s rate(http_requests[40s]) - rate(http_requests[1m] offset 10000s) + {job="api-server", instance="0", group="production"} 2 + {job="api-server", instance="1", group="production"} 1 + {job="api-server", instance="0", group="canary"} 5 + {job="api-server", instance="1", group="canary"} 0 + diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 1a2a9a9836..e0be10b142 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -53,13 +53,46 @@ const ( doubleDelta ) -// chunkDesc contains meta-data for a chunk. Many of its methods are -// goroutine-safe proxies for chunk methods. +// chunkDesc contains meta-data for a chunk. Pay special attention to the +// documented requirements for calling its methods concurrently (WRT pinning and +// locking). The doc comments spell out the requirements for each method, but +// here is an overview and general explanation: +// +// Everything that changes the pinning of the underlying chunk or deals with its +// eviction is protected by a mutex. This affects the following methods: pin, +// unpin, refCount, isEvicted, maybeEvict. These methods can be called at any +// time without further prerequisites. +// +// Another group of methods acts on (or sets) the underlying chunk. These +// methods involve no locking. They may only be called if the caller has pinned +// the chunk (to guarantee the chunk is not evicted concurrently). Also, the +// caller must make sure nobody else will call these methods concurrently, +// either by holding the sole reference to the chunkDesc (usually during loading +// or creation) or by locking the fingerprint of the series the chunkDesc +// belongs to. The affected methods are: add, maybePopulateLastTime, setChunk. +// +// Finally, there are the special cases firstTime and lastTime. lastTime requires +// to have locked the fingerprint of the series but the chunk does not need to +// be pinned. That's because the chunkLastTime field in chunkDesc gets populated +// upon completion of the chunk (when it is still pinned, and which happens +// while the series's fingerprint is locked). Once that has happened, calling +// lastTime does not require the chunk to be loaded anymore. Before that has +// happened, the chunk is pinned anyway. The chunkFirstTime field in chunkDesc +// is populated upon creation of a chunkDesc, so it is alway safe to call +// firstTime. The firstTime method is arguably not needed and only there for +// consistency with lastTime. +// +// Yet another (deprecated) case is lastSamplePair. It's used in federation and +// must be callable without pinning. Locking the fingerprint of the series is +// still required (to avoid concurrent appends to the chunk). The call is +// relatively expensive because of the required acquisition of the evict +// mutex. It will go away, though, once tracking the lastSamplePair has been +// moved into the series object. type chunkDesc struct { - sync.Mutex // TODO(beorn7): Try out if an RWMutex would help here. + sync.Mutex // Protects pinning. c chunk // nil if chunk is evicted. rCnt int - chunkFirstTime model.Time // Populated at creation. + chunkFirstTime model.Time // Populated at creation. Immutable. chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. // evictListElement is nil if the chunk is not in the evict list. @@ -83,16 +116,17 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { } } +// add adds a sample pair to the underlying chunk. For safe concurrent access, +// The chunk must be pinned, and the caller must have locked the fingerprint of +// the series. func (cd *chunkDesc) add(s *model.SamplePair) []chunk { - cd.Lock() - defer cd.Unlock() - return cd.c.add(s) } // pin increments the refCount by one. Upon increment from 0 to 1, this // chunkDesc is removed from the evict list. To enable the latter, the -// evictRequests channel has to be provided. +// evictRequests channel has to be provided. This method can be called +// concurrently at any time. func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -106,7 +140,8 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { // unpin decrements the refCount by one. Upon decrement from 1 to 0, this // chunkDesc is added to the evict list. To enable the latter, the evictRequests -// channel has to be provided. +// channel has to be provided. This method can be called concurrently at any +// time. func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -121,6 +156,8 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { } } +// refCount returns the number of pins. This method can be called concurrently +// at any time. func (cd *chunkDesc) refCount() int { cd.Lock() defer cd.Unlock() @@ -128,30 +165,39 @@ func (cd *chunkDesc) refCount() int { return cd.rCnt } +// firstTime returns the timestamp of the first sample in the chunk. This method +// can be called concurrently at any time. It only returns the immutable +// cd.chunkFirstTime without any locking. Arguably, this method is +// useless. However, it provides consistency with the lastTime method. func (cd *chunkDesc) firstTime() model.Time { - // No lock required, will never be modified. return cd.chunkFirstTime } +// lastTime returns the timestamp of the last sample in the chunk. For safe +// concurrent access, this method requires the fingerprint of the time series to +// be locked. func (cd *chunkDesc) lastTime() model.Time { - cd.Lock() - defer cd.Unlock() - if cd.chunkLastTime != model.Earliest || cd.c == nil { return cd.chunkLastTime } return cd.c.newIterator().lastTimestamp() } +// maybePopulateLastTime populates the chunkLastTime from the underlying chunk +// if it has not yet happened. Call this method directly after having added the +// last sample to a chunk or after closing a head chunk due to age. For safe +// concurrent access, the chunk must be pinned, and the caller must have locked +// the fingerprint of the series. func (cd *chunkDesc) maybePopulateLastTime() { - cd.Lock() - defer cd.Unlock() - if cd.chunkLastTime == model.Earliest && cd.c != nil { cd.chunkLastTime = cd.c.newIterator().lastTimestamp() } } +// lastSamplePair returns the last sample pair of the underlying chunk, or nil +// if the chunk is evicted. For safe concurrent access, this method requires the +// fingerprint of the time series to be locked. +// TODO(beorn7): Move up into memorySeries. func (cd *chunkDesc) lastSamplePair() *model.SamplePair { cd.Lock() defer cd.Unlock() @@ -166,28 +212,22 @@ func (cd *chunkDesc) lastSamplePair() *model.SamplePair { } } +// isEvicted returns whether the chunk is evicted. For safe concurrent access, +// the caller must have locked the fingerprint of the series. func (cd *chunkDesc) isEvicted() bool { + // Locking required here because we do not want the caller to force + // pinning the chunk first, so it could be evicted while this method is + // called. cd.Lock() defer cd.Unlock() return cd.c == nil } -func (cd *chunkDesc) contains(t model.Time) bool { - return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) -} - -func (cd *chunkDesc) chunk() chunk { - cd.Lock() - defer cd.Unlock() - - return cd.c -} - +// setChunk sets the underlying chunk. The caller must have locked the +// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first +// call pin and then set the chunk). func (cd *chunkDesc) setChunk(c chunk) { - cd.Lock() - defer cd.Unlock() - if cd.c != nil { panic("chunk already set") } @@ -196,7 +236,7 @@ func (cd *chunkDesc) setChunk(c chunk) { // maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // is now evicted, which includes the case that the chunk was evicted even -// before this method was called. +// before this method was called. It can be called concurrently at any time. func (cd *chunkDesc) maybeEvict() bool { cd.Lock() defer cd.Unlock() @@ -207,9 +247,9 @@ func (cd *chunkDesc) maybeEvict() bool { if cd.rCnt != 0 { return false } - // Last opportunity to populate chunkLastTime. if cd.chunkLastTime == model.Earliest { - cd.chunkLastTime = cd.c.newIterator().lastTimestamp() + // This must never happen. + panic("chunkLastTime not populated for evicted chunk") } cd.c = nil chunkOps.WithLabelValues(evict).Inc() @@ -251,12 +291,11 @@ type chunkIterator interface { sampleValueAtIndex(int) model.SampleValue // Gets the last sample value in the chunk. lastSampleValue() model.SampleValue - // Gets the two values that are immediately adjacent to a given time. In - // case a value exist at precisely the given time, only that single - // value is returned. Only the first or last value is returned (as a - // single value), if the given time is before or after the first or last - // value, respectively. - valueAtTime(model.Time) []model.SamplePair + // Gets the value that is closest before the given time. In case a value + // exists at precisely the given time, that value is returned. If no + // applicable value exists, a SamplePair with timestamp model.Earliest + // and value 0.0 is returned. + valueAtOrBeforeTime(model.Time) model.SamplePair // Gets all values contained within a given interval. rangeValues(metric.Interval) []model.SamplePair // Whether a given timestamp is contained between first and last value diff --git a/storage/local/delta.go b/storage/local/delta.go index 5d069afd6b..7222c5a157 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -301,41 +301,17 @@ type deltaEncodedChunkIterator struct { // length implements chunkIterator. func (it *deltaEncodedChunkIterator) length() int { return it.len } -// valueAtTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtTime(t model.Time) []model.SamplePair { +// valueAtOrBeforeTime implements chunkIterator. +func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { i := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(t) + return it.timestampAtIndex(i).After(t) }) - - switch i { - case 0: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(0), - Value: it.sampleValueAtIndex(0), - }} - case it.len: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(it.len - 1), - Value: it.sampleValueAtIndex(it.len - 1), - }} - default: - ts := it.timestampAtIndex(i) - if ts.Equal(t) { - return []model.SamplePair{{ - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }} - } - return []model.SamplePair{ - { - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - }, - { - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }, - } + if i == 0 { + return model.SamplePair{Timestamp: model.Earliest} + } + return model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), } } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index eaf3093cb9..e0e0856ce6 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -407,41 +407,17 @@ type doubleDeltaEncodedChunkIterator struct { // length implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } -// valueAtTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtTime(t model.Time) []model.SamplePair { +// valueAtOrBeforeTime implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { i := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(t) + return it.timestampAtIndex(i).After(t) }) - - switch i { - case 0: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(0), - Value: it.sampleValueAtIndex(0), - }} - case it.len: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(it.len - 1), - Value: it.sampleValueAtIndex(it.len - 1), - }} - default: - ts := it.timestampAtIndex(i) - if ts.Equal(t) { - return []model.SamplePair{{ - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }} - } - return []model.SamplePair{ - { - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - }, - { - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }, - } + if i == 0 { + return model.SamplePair{Timestamp: model.Earliest} + } + return model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), } } diff --git a/storage/local/interface.go b/storage/local/interface.go index 454c2d9d5f..005b397266 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -14,8 +14,6 @@ package local import ( - "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -52,10 +50,6 @@ type Storage interface { LabelValuesForLabelName(model.LabelName) model.LabelValues // Get the metric associated with the provided fingerprint. MetricForFingerprint(model.Fingerprint) metric.Metric - // Construct an iterator for a given fingerprint. - // The iterator will never return samples older than retention time, - // relative to the time NewIterator was called. - NewIterator(model.Fingerprint) SeriesIterator // Drop all time series associated with the given fingerprints. DropMetricsForFingerprints(...model.Fingerprint) // Run the various maintenance loops in goroutines. Returns when the @@ -77,12 +71,11 @@ type Storage interface { // modifying the corresponding series, but the iterator will represent the state // of the series prior the modification. type SeriesIterator interface { - // Gets the two values that are immediately adjacent to a given time. In - // case a value exist at precisely the given time, only that single - // value is returned. Only the first or last value is returned (as a - // single value), if the given time is before or after the first or last - // value, respectively. - ValueAtTime(model.Time) []model.SamplePair + // Gets the value that is closest before the given time. In case a value + // exists at precisely the given time, that value is returned. If no + // applicable value exists, a SamplePair with timestamp model.Earliest + // and value 0.0 is returned. + ValueAtOrBeforeTime(model.Time) model.SamplePair // Gets the boundary values of an interval: the first and last value // within a given interval. BoundaryValues(metric.Interval) []model.SamplePair @@ -90,15 +83,14 @@ type SeriesIterator interface { RangeValues(metric.Interval) []model.SamplePair } -// A Preloader preloads series data necessary for a query into memory and pins -// them until released via Close(). Its methods are generally not -// goroutine-safe. +// A Preloader preloads series data necessary for a query into memory, pins it +// until released via Close(), and returns an iterator for the pinned data. Its +// methods are generally not goroutine-safe. type Preloader interface { PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, - stalenessDelta time.Duration, - ) error + ) (SeriesIterator, error) // Close unpins any previously requested series data from memory. Close() } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 8a1c178f5a..22c16eccd2 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -823,6 +823,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in } } + headChunkClosed := true // Initial assumption. for i := int64(0); i < numChunkDescs; i++ { if i < persistWatermark { firstTime, err := binary.ReadVarint(r) @@ -844,6 +845,9 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in chunkDescsTotal++ } else { // Non-persisted chunk. + // If there are non-persisted chunks at all, we consider + // the head chunk not to be closed yet. + headChunkClosed = false encoding, err := r.ReadByte() if err != nil { log.Warn("Could not decode chunk type:", err) @@ -856,17 +860,17 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in p.dirty = true return sm, chunksToPersist, nil } - chunkDescs[i] = newChunkDesc(chunk, chunk.firstTime()) - chunksToPersist++ + cd := newChunkDesc(chunk, chunk.firstTime()) + if i < numChunkDescs-1 { + // This is NOT the head chunk. So it's a chunk + // to be persisted, and we need to populate lastTime. + chunksToPersist++ + cd.maybePopulateLastTime() + } + chunkDescs[i] = cd } } - headChunkClosed := persistWatermark >= numChunkDescs - if !headChunkClosed { - // Head chunk is not ready for persisting yet. - chunksToPersist-- - } - fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ metric: model.Metric(metric), chunkDescs: chunkDescs, diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index fa3525f774..47cf9078a1 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -485,6 +485,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS1.headChunkClosed { t.Error("headChunkClosed is true") } + if loadedS1.head().chunkFirstTime != 1 { + t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime) + } + if loadedS1.head().chunkLastTime != model.Earliest { + t.Error("want chunkLastTime in head chunk to be unset") + } } else { t.Errorf("couldn't find %v in loaded map", m1) } @@ -501,6 +507,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if !loadedS3.headChunkClosed { t.Error("headChunkClosed is false") } + if loadedS3.head().chunkFirstTime != 2 { + t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime) + } + if loadedS3.head().chunkLastTime != 2 { + t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime) + } } else { t.Errorf("couldn't find %v in loaded map", m3) } @@ -526,6 +538,27 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS4.headChunkClosed { t.Error("headChunkClosed is true") } + for i, cd := range loadedS4.chunkDescs { + if cd.chunkFirstTime != cd.c.firstTime() { + t.Errorf( + "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", + i, cd.c.firstTime(), cd.chunkFirstTime, + ) + } + if i == len(loadedS4.chunkDescs)-1 { + // Head chunk. + if cd.chunkLastTime != model.Earliest { + t.Error("want chunkLastTime in head chunk to be unset") + } + continue + } + if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + t.Errorf( + "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", + i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + ) + } + } } else { t.Errorf("couldn't find %v in loaded map", m4) } @@ -551,6 +584,34 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS5.headChunkClosed { t.Error("headChunkClosed is true") } + for i, cd := range loadedS5.chunkDescs { + if i < 3 { + // Evicted chunks. + if cd.chunkFirstTime == model.Earliest { + t.Errorf("chunkDesc[%d]: chunkLastTime not set", i) + } + continue + } + if cd.chunkFirstTime != cd.c.firstTime() { + t.Errorf( + "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", + i, cd.c.firstTime(), cd.chunkFirstTime, + ) + } + if i == len(loadedS5.chunkDescs)-1 { + // Head chunk. + if cd.chunkLastTime != model.Earliest { + t.Error("want chunkLastTime in head chunk to be unset") + } + continue + } + if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + t.Errorf( + "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", + i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + ) + } + } } else { t.Errorf("couldn't find %v in loaded map", m5) } diff --git a/storage/local/preload.go b/storage/local/preload.go index 0fc5030d7a..08a88875f7 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -13,11 +13,7 @@ package local -import ( - "time" - - "github.com/prometheus/common/model" -) +import "github.com/prometheus/common/model" // memorySeriesPreloader is a Preloader for the memorySeriesStorage. type memorySeriesPreloader struct { @@ -29,74 +25,15 @@ type memorySeriesPreloader struct { func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, - stalenessDelta time.Duration, -) error { - cds, err := p.storage.preloadChunksForRange(fp, from, through, stalenessDelta) +) (SeriesIterator, error) { + cds, iter, err := p.storage.preloadChunksForRange(fp, from, through) if err != nil { - return err + return nil, err } p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return nil + return iter, nil } -/* -// MetricAtTime implements Preloader. -func (p *memorySeriesPreloader) MetricAtTime(fp model.Fingerprint, t model.Time) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: t, - through: t, - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return nil -} - -// MetricAtInterval implements Preloader. -func (p *memorySeriesPreloader) MetricAtInterval(fp model.Fingerprint, from, through model.Time, interval time.Duration) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: from, - through: through, - interval: interval, - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return -} - -// MetricRange implements Preloader. -func (p *memorySeriesPreloader) MetricRange(fp model.Fingerprint, t model.Time, rangeDuration time.Duration) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: t, - through: t, - rangeDuration: through.Sub(from), - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return -} - -// MetricRangeAtInterval implements Preloader. -func (p *memorySeriesPreloader) MetricRangeAtInterval(fp model.Fingerprint, from, through model.Time, interval, rangeDuration time.Duration) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: from, - through: through, - interval: interval, - rangeDuration: rangeDuration, - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return -} -*/ - // Close implements Preloader. func (p *memorySeriesPreloader) Close() { for _, cd := range p.pinnedChunkDescs { diff --git a/storage/local/series.go b/storage/local/series.go index d731548810..6943e925a9 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -315,7 +315,7 @@ func (s *memorySeries) dropChunks(t model.Time) { // preloadChunks is an internal helper method. func (s *memorySeries) preloadChunks( indexes []int, fp model.Fingerprint, mss *memorySeriesStorage, -) ([]*chunkDesc, error) { +) ([]*chunkDesc, SeriesIterator, error) { loadIndexes := []int{} pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) for _, idx := range indexes { @@ -339,52 +339,47 @@ func (s *memorySeries) preloadChunks( cd.unpin(mss.evictRequests) } chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) - return nil, err + return nil, nopIter, err } for i, c := range chunks { s.chunkDescs[loadIndexes[i]].setChunk(c) } } - return pinnedChunkDescs, nil -} -/* -func (s *memorySeries) preloadChunksAtTime(t model.Time, p *persistence) (chunkDescs, error) { - s.mtx.Lock() - defer s.mtx.Unlock() - - if len(s.chunkDescs) == 0 { - return nil, nil + if !s.headChunkClosed && indexes[len(indexes)-1] == len(s.chunkDescs)-1 { + s.headChunkUsedByIterator = true } - var pinIndexes []int - // Find first chunk where lastTime() is after or equal to t. - i := sort.Search(len(s.chunkDescs), func(i int) bool { - return !s.chunkDescs[i].lastTime().Before(t) - }) - switch i { - case 0: - pinIndexes = []int{0} - case len(s.chunkDescs): - pinIndexes = []int{i - 1} - default: - if s.chunkDescs[i].contains(t) { - pinIndexes = []int{i} - } else { - pinIndexes = []int{i - 1, i} - } + iter := &boundedIterator{ + it: s.newIterator(pinnedChunkDescs), + start: model.Now().Add(-mss.dropAfter), } - return s.preloadChunks(pinIndexes, p) + return pinnedChunkDescs, iter, nil +} + +// newIterator returns a new SeriesIterator for the provided chunkDescs (which +// must be pinned). The caller must have locked the fingerprint of the +// memorySeries. +func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator { + chunks := make([]chunk, 0, len(pinnedChunkDescs)) + for _, cd := range pinnedChunkDescs { + // It's OK to directly access cd.c here (without locking) as the + // series FP is locked and the chunk is pinned. + chunks = append(chunks, cd.c) + } + return &memorySeriesIterator{ + chunks: chunks, + chunkIts: make([]chunkIterator, len(chunks)), + } } -*/ // preloadChunksForRange loads chunks for the given range from the persistence. // The caller must have locked the fingerprint of the series. func (s *memorySeries) preloadChunksForRange( from model.Time, through model.Time, fp model.Fingerprint, mss *memorySeriesStorage, -) ([]*chunkDesc, error) { +) ([]*chunkDesc, SeriesIterator, error) { firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() @@ -392,15 +387,16 @@ func (s *memorySeries) preloadChunksForRange( if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { cds, err := mss.loadChunkDescs(fp, s.persistWatermark) if err != nil { - return nil, err + return nil, nopIter, err } s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescsOffset = 0 s.persistWatermark += len(cds) + firstChunkDescTime = s.chunkDescs[0].firstTime() } - if len(s.chunkDescs) == 0 { - return nil, nil + if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) { + return nil, nopIter, nil } // Find first chunk with start time after "from". @@ -411,6 +407,13 @@ func (s *memorySeries) preloadChunksForRange( throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool { return s.chunkDescs[i].firstTime().After(through) }) + if fromIdx == len(s.chunkDescs) { + // Even the last chunk starts before "from". Find out if the + // series ends before "from" and we don't need to do anything. + if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) { + return nil, nopIter, nil + } + } if fromIdx > 0 { fromIdx-- } @@ -425,25 +428,6 @@ func (s *memorySeries) preloadChunksForRange( return s.preloadChunks(pinIndexes, fp, mss) } -// newIterator returns a new SeriesIterator. The caller must have locked the -// fingerprint of the memorySeries. -func (s *memorySeries) newIterator() SeriesIterator { - chunks := make([]chunk, 0, len(s.chunkDescs)) - for i, cd := range s.chunkDescs { - if chunk := cd.chunk(); chunk != nil { - if i == len(s.chunkDescs)-1 && !s.headChunkClosed { - s.headChunkUsedByIterator = true - } - chunks = append(chunks, chunk) - } - } - - return &memorySeriesIterator{ - chunks: chunks, - chunkIts: make([]chunkIterator, len(chunks)), - } -} - // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. @@ -482,70 +466,33 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - chunkIt chunkIterator // Last chunkIterator used by ValueAtTime. + chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. chunkIts []chunkIterator // Caches chunkIterators. chunks []chunk } -// ValueAtTime implements SeriesIterator. -func (it *memorySeriesIterator) ValueAtTime(t model.Time) []model.SamplePair { +// ValueAtOrBeforeTime implements SeriesIterator. +func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { // The most common case. We are iterating through a chunk. if it.chunkIt != nil && it.chunkIt.contains(t) { - return it.chunkIt.valueAtTime(t) + return it.chunkIt.valueAtOrBeforeTime(t) } if len(it.chunks) == 0 { - return nil + return model.SamplePair{Timestamp: model.Earliest} } - // Before or exactly on the first sample of the series. - it.chunkIt = it.chunkIterator(0) - ts := it.chunkIt.timestampAtIndex(0) - if !t.After(ts) { - // return first value of first chunk - return []model.SamplePair{{ - Timestamp: ts, - Value: it.chunkIt.sampleValueAtIndex(0), - }} - } - - // After or exactly on the last sample of the series. - it.chunkIt = it.chunkIterator(len(it.chunks) - 1) - ts = it.chunkIt.lastTimestamp() - if !t.Before(ts) { - // return last value of last chunk - return []model.SamplePair{{ - Timestamp: ts, - Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1), - }} - } - - // Find last chunk where firstTime() is before or equal to t. + // Find the last chunk where firstTime() is before or equal to t. l := len(it.chunks) - 1 i := sort.Search(len(it.chunks), func(i int) bool { return !it.chunks[l-i].firstTime().After(t) }) if i == len(it.chunks) { - panic("out of bounds") + // Even the first chunk starts after t. + return model.SamplePair{Timestamp: model.Earliest} } it.chunkIt = it.chunkIterator(l - i) - ts = it.chunkIt.lastTimestamp() - if t.After(ts) { - // We ended up between two chunks. - sp1 := model.SamplePair{ - Timestamp: ts, - Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1), - } - it.chunkIt = it.chunkIterator(l - i + 1) - return []model.SamplePair{ - sp1, - { - Timestamp: it.chunkIt.timestampAtIndex(0), - Value: it.chunkIt.sampleValueAtIndex(0), - }, - } - } - return it.chunkIt.valueAtTime(t) + return it.chunkIt.valueAtOrBeforeTime(t) } // BoundaryValues implements SeriesIterator. @@ -578,18 +525,24 @@ func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.Sampl } chunkIt := it.chunkIterator(i + j) if len(values) == 0 { - firstValues := chunkIt.valueAtTime(in.OldestInclusive) - switch len(firstValues) { - case 2: - values = append(values, firstValues[1]) - case 1: - values = firstValues - default: - panic("unexpected return from valueAtTime") + for s := range chunkIt.values() { + if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) { + values = append(values, *s) + // We cannot just break out here as we have to consume all + // the values to not leak a goroutine. This could obviously + // be made much neater with more suitable methods in the chunk + // interface. But currently, BoundaryValues is only used by + // `predict_linear` so we would pollute the chunk interface + // unduly just for one single corner case. Plus, even that use + // of BoundaryValues is suboptimal and should be replaced. + } } } if chunkIt.lastTimestamp().After(in.NewestInclusive) { - values = append(values, chunkIt.valueAtTime(in.NewestInclusive)[0]) + s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive) + if s.Timestamp != model.Earliest { + values = append(values, s) + } break } } @@ -644,8 +597,8 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { type nopSeriesIterator struct{} // ValueAtTime implements SeriesIterator. -func (i nopSeriesIterator) ValueAtTime(t model.Time) []model.SamplePair { - return []model.SamplePair{} +func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { + return model.SamplePair{Timestamp: model.Earliest} } // BoundaryValues implements SeriesIterator. @@ -657,3 +610,5 @@ func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { return []model.SamplePair{} } + +var nopIter nopSeriesIterator // A nopSeriesIterator for convenience. Can be shared. diff --git a/storage/local/storage.go b/storage/local/storage.go index bb1c4eda9e..36cef69eaf 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -347,26 +347,6 @@ func (s *memorySeriesStorage) WaitForIndexing() { s.persistence.waitForIndexing() } -// NewIterator implements Storage. -func (s *memorySeriesStorage) NewIterator(fp model.Fingerprint) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series, ok := s.fpToSeries.get(fp) - if !ok { - // Oops, no series for fp found. That happens if, after - // preloading is done, the whole series is identified as old - // enough for purging and hence purged for good. As there is no - // data left to iterate over, return an iterator that will never - // return any values. - return nopSeriesIterator{} - } - return &boundedIterator{ - it: series.newIterator(), - start: model.Now().Add(-s.dropAfter), - } -} - // LastSampleForFingerprint implements Storage. func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) *model.SamplePair { s.fpLocker.Lock(fp) @@ -386,12 +366,12 @@ type boundedIterator struct { start model.Time } -// ValueAtTime implements the SeriesIterator interface. -func (bit *boundedIterator) ValueAtTime(ts model.Time) []model.SamplePair { +// ValueAtOrBeforeTime implements the SeriesIterator interface. +func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { if ts < bit.start { - return []model.SamplePair{} + return model.SamplePair{Timestamp: model.Earliest} } - return bit.it.ValueAtTime(ts) + return bit.it.ValueAtOrBeforeTime(ts) } // BoundaryValues implements the SeriesIterator interface. @@ -571,6 +551,8 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } +// ErrOutOfOrderSample is returned if a sample has a timestamp before the latest +// timestamp in the series it is appended to. var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") // Append implements Storage. @@ -707,8 +689,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, - stalenessDelta time.Duration, -) ([]*chunkDesc, error) { +) ([]*chunkDesc, SeriesIterator, error) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -716,20 +697,20 @@ func (s *memorySeriesStorage) preloadChunksForRange( if !ok { has, first, last, err := s.persistence.hasArchivedMetric(fp) if err != nil { - return nil, err + return nil, nopIter, err } if !has { s.invalidPreloadRequestsCount.Inc() - return nil, nil + return nil, nopIter, nil } - if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { + if from.Before(last) && through.After(first) { metric, err := s.persistence.archivedMetric(fp) if err != nil { - return nil, err + return nil, nopIter, err } series = s.getOrCreateSeries(fp, metric) } else { - return nil, nil + return nil, nopIter, nil } } return series.preloadChunksForRange(from, through, fp, s) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 74bb631535..4c7034f646 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -405,19 +405,17 @@ func TestRetentionCutoff(t *testing.T) { defer pl.Close() // Preload everything. - err := pl.PreloadRange(fp, insertStart, now, 5*time.Minute) + it, err := pl.PreloadRange(fp, insertStart, now) if err != nil { t.Fatalf("Error preloading outdated chunks: %s", err) } - it := s.NewIterator(fp) - - vals := it.ValueAtTime(now.Add(-61 * time.Minute)) - if len(vals) != 0 { + val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) + if val.Timestamp != model.Earliest { t.Errorf("unexpected result for timestamp before retention period") } - vals = it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) + vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) // We get 59 values here because the model.Now() is slightly later // than our now. if len(vals) != 59 { @@ -502,11 +500,18 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - it := s.NewIterator(fpList[0]) + _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.NewIterator(fpList[1]) + + _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -528,11 +533,18 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - it = s.NewIterator(fpList[0]) + _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.NewIterator(fpList[1]) + + _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -640,7 +652,7 @@ func TestChunkType1(t *testing.T) { testChunk(t, 1) } -func testValueAtTime(t *testing.T, encoding chunkEncoding) { +func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -658,82 +670,66 @@ func testValueAtTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.NewIterator(fp) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } // #1 Exactly on a sample. for i, expected := range samples { - actual := it.ValueAtTime(expected.Timestamp) + actual := it.ValueAtOrBeforeTime(expected.Timestamp) - if len(actual) != 1 { - t.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + t.Errorf("1.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - t.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - t.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value) + if expected.Value != actual.Value { + t.Errorf("1.%d. Got %v; want %v", i, actual.Value, expected.Value) } } // #2 Between samples. - for i, expected1 := range samples { + for i, expected := range samples { if i == len(samples)-1 { continue } - expected2 := samples[i+1] - actual := it.ValueAtTime(expected1.Timestamp + 1) + actual := it.ValueAtOrBeforeTime(expected.Timestamp + 1) - if len(actual) != 2 { - t.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + t.Errorf("2.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected1.Timestamp != actual[0].Timestamp { - t.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp) - } - if expected1.Value != actual[0].Value { - t.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value) - } - if expected2.Timestamp != actual[1].Timestamp { - t.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp) - } - if expected2.Value != actual[1].Value { - t.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value) + if expected.Value != actual.Value { + t.Errorf("2.%d. Got %v; want %v", i, actual.Value, expected.Value) } } // #3 Corner cases: Just before the first sample, just after the last. - expected := samples[0] - actual := it.ValueAtTime(expected.Timestamp - 1) - if len(actual) != 1 { - t.Fatalf("3.1. Expected exactly one result, got %d.", len(actual)) + expected := &model.Sample{Timestamp: model.Earliest} + actual := it.ValueAtOrBeforeTime(samples[0].Timestamp - 1) + if expected.Timestamp != actual.Timestamp { + t.Errorf("3.1. Got %v; want %v", actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - t.Errorf("3.1. Got %v; want %v", actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - t.Errorf("3.1. Got %v; want %v", actual[0].Value, expected.Value) + if expected.Value != actual.Value { + t.Errorf("3.1. Got %v; want %v", actual.Value, expected.Value) } expected = samples[len(samples)-1] - actual = it.ValueAtTime(expected.Timestamp + 1) - if len(actual) != 1 { - t.Fatalf("3.2. Expected exactly one result, got %d.", len(actual)) + actual = it.ValueAtOrBeforeTime(expected.Timestamp + 1) + if expected.Timestamp != actual.Timestamp { + t.Errorf("3.2. Got %v; want %v", actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - t.Errorf("3.2. Got %v; want %v", actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - t.Errorf("3.2. Got %v; want %v", actual[0].Value, expected.Value) + if expected.Value != actual.Value { + t.Errorf("3.2. Got %v; want %v", actual.Value, expected.Value) } } func TestValueAtTimeChunkType0(t *testing.T) { - testValueAtTime(t, 0) + testValueAtOrBeforeTime(t, 0) } func TestValueAtTimeChunkType1(t *testing.T) { - testValueAtTime(t, 1) + testValueAtOrBeforeTime(t, 1) } -func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) { +func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -751,59 +747,67 @@ func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + b.Fatalf("Error preloading everything: %s", err) + } + b.ResetTimer() for i := 0; i < b.N; i++ { - it := s.NewIterator(fp) - // #1 Exactly on a sample. for i, expected := range samples { - actual := it.ValueAtTime(expected.Timestamp) + actual := it.ValueAtOrBeforeTime(expected.Timestamp) - if len(actual) != 1 { - b.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + b.Errorf("1.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - b.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - b.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value) + if expected.Value != actual.Value { + b.Errorf("1.%d. Got %v; want %v", i, actual.Value, expected.Value) } } // #2 Between samples. - for i, expected1 := range samples { + for i, expected := range samples { if i == len(samples)-1 { continue } - expected2 := samples[i+1] - actual := it.ValueAtTime(expected1.Timestamp + 1) + actual := it.ValueAtOrBeforeTime(expected.Timestamp + 1) - if len(actual) != 2 { - b.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + b.Errorf("2.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected1.Timestamp != actual[0].Timestamp { - b.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp) - } - if expected1.Value != actual[0].Value { - b.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value) - } - if expected2.Timestamp != actual[1].Timestamp { - b.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp) - } - if expected2.Value != actual[1].Value { - b.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value) + if expected.Value != actual.Value { + b.Errorf("2.%d. Got %v; want %v", i, actual.Value, expected.Value) } } + + // #3 Corner cases: Just before the first sample, just after the last. + expected := &model.Sample{Timestamp: model.Earliest} + actual := it.ValueAtOrBeforeTime(samples[0].Timestamp - 1) + if expected.Timestamp != actual.Timestamp { + b.Errorf("3.1. Got %v; want %v", actual.Timestamp, expected.Timestamp) + } + if expected.Value != actual.Value { + b.Errorf("3.1. Got %v; want %v", actual.Value, expected.Value) + } + expected = samples[len(samples)-1] + actual = it.ValueAtOrBeforeTime(expected.Timestamp + 1) + if expected.Timestamp != actual.Timestamp { + b.Errorf("3.2. Got %v; want %v", actual.Timestamp, expected.Timestamp) + } + if expected.Value != actual.Value { + b.Errorf("3.2. Got %v; want %v", actual.Value, expected.Value) + } } } -func BenchmarkValueAtTimeChunkType0(b *testing.B) { - benchmarkValueAtTime(b, 0) +func BenchmarkValueAtOrBeforeTimeChunkType0(b *testing.B) { + benchmarkValueAtOrBeforeTime(b, 0) } func BenchmarkValueAtTimeChunkType1(b *testing.B) { - benchmarkValueAtTime(b, 1) + benchmarkValueAtOrBeforeTime(b, 1) } func testRangeValues(t *testing.T, encoding chunkEncoding) { @@ -824,7 +828,10 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.NewIterator(fp) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } // #1 Zero length interval at sample. for i, expected := range samples { @@ -976,12 +983,14 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + b.Fatalf("Error preloading everything: %s", err) + } + b.ResetTimer() for i := 0; i < b.N; i++ { - - it := s.NewIterator(fp) - for _, sample := range samples { actual := it.RangeValues(metric.Interval{ OldestInclusive: sample.Timestamp - 20, @@ -1023,7 +1032,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - it := s.NewIterator(fp) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } actual := it.BoundaryValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1041,7 +1053,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - it = s.NewIterator(fp) + _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } actual = it.BoundaryValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1215,7 +1230,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { // Load everything back. p := s.NewPreloader() - p.PreloadRange(fp, 0, 100000, time.Hour) + p.PreloadRange(fp, 0, 100000) if oldLen != len(series.chunkDescs) { t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) @@ -1513,20 +1528,21 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, t.Fatal(err) } p := s.NewPreloader() - p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour) - found := s.NewIterator(fp).ValueAtTime(sample.Timestamp) - if len(found) != 1 { - t.Errorf("Sample %#v: Expected exactly one value, found %d.", sample, len(found)) + it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) + if err != nil { + t.Fatal(err) + } + found := it.ValueAtOrBeforeTime(sample.Timestamp) + if found.Timestamp == model.Earliest { + t.Errorf("Sample %#v: Expected sample not found.", sample) result = false p.Close() continue } - want := sample.Value - got := found[0].Value - if want != got || sample.Timestamp != found[0].Timestamp { + if sample.Value != found.Value || sample.Timestamp != found.Timestamp { t.Errorf( "Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", - want, sample.Timestamp, got, found[0].Timestamp, + sample.Value, sample.Timestamp, found.Value, found.Timestamp, ) result = false } @@ -1559,13 +1575,11 @@ func TestAppendOutOfOrder(t *testing.T) { pl := s.NewPreloader() defer pl.Close() - err = pl.PreloadRange(fp, 0, 2, 5*time.Minute) + it, err := pl.PreloadRange(fp, 0, 2) if err != nil { t.Fatalf("Error preloading chunks: %s", err) } - it := s.NewIterator(fp) - want := []model.SamplePair{ { Timestamp: 0,