From 0e202dacb41604f97d9de711bc1d35f717a1f675 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 16 Feb 2016 18:47:50 +0100 Subject: [PATCH 1/5] Streamline series iterator creation This will fix issue #1035 and will also help to make issue #1264 less bad. The fundamental problem in the current code: In the preload phase, we quite accurately determine which chunks will be used for the query being executed. However, in the subsequent step of creating series iterators, the created iterators are referencing _all_ in-memory chunks in their series, even the un-pinned ones. In iterator creation, we copy a pointer to each in-memory chunk of a series into the iterator. While this creates a certain amount of allocation churn, the worst thing about it is that copying the chunk pointer out of the chunkDesc requires a mutex acquisition. (Remember that the iterator will also reference un-pinned chunks, so we need to acquire the mutex to protect against concurrent eviction.) The worst case happens if a series doesn't even contain any relevant samples for the query time range. We notice that during preloading but then we will still create a series iterator for it. But even for series that do contain relevant samples, the overhead is quite bad for instant queries that retrieve a single sample from each series, but still go through all the effort of series iterator creation. All of that is particularly bad if a series has many in-memory chunks. This commit addresses the problem from two sides: First, it merges preloading and iterator creation into one step, i.e. the preload call returns an iterator for exactly the preloaded chunks. Second, the required mutex acquisition in chunkDesc has been greatly reduced. That was enabled by a side effect of the first step, which is that the iterator is only referencing pinned chunks, so there is no risk of concurrent eviction anymore, and chunks can be accessed without mutex acquisition. To simplify the code changes for the above, the long-planned change of ValueAtTime to ValueAtOrBefore time was performed at the same time. (It should have been done first, but it kind of accidentally happened while I was in the middle of writing the series iterator changes. Sorry for that.) So far, we actively filtered the up to two values that were returned by ValueAtTime, i.e. we invested work to retrieve up to two values, and then we invested more work to throw one of them away. The SeriesIterator.BoundaryValues method can be removed once #1401 is fixed. But I really didn't want to load even more changes into this PR. Benchmarks: The BenchmarkFuzz.* benchmarks run 83% faster (i.e. about six times faster) and allocate 95% fewer bytes. The reason for that is that the benchmark reads one sample after another from the time series and creates a new series iterator for each sample read. To find out how much these improvements matter in practice, I have mirrored a beefy Prometheus server at SoundCloud that suffers from both issues #1035 and #1264. To reach steady state that would be comparable, the server needs to run for 15d. So far, it has run for 1d. The test server currently has only half as many memory time series and 60% of the memory chunks the main server has. The 90th percentile rule evaluation cycle time is ~11s on the main server and only ~3s on the test server. However, these numbers might get much closer over time. In addition to performance improvements, this commit removes about 150 LOC. --- promql/analyzer.go | 25 +++- promql/engine.go | 34 ++---- promql/functions.go | 5 + storage/local/chunk.go | 106 +++++++++------- storage/local/delta.go | 42 ++----- storage/local/doubledelta.go | 42 ++----- storage/local/interface.go | 26 ++-- storage/local/preload.go | 73 +---------- storage/local/series.go | 173 ++++++++++---------------- storage/local/storage.go | 43 ++----- storage/local/storage_test.go | 220 ++++++++++++++++++---------------- 11 files changed, 325 insertions(+), 464 deletions(-) diff --git a/promql/analyzer.go b/promql/analyzer.go index a10656e1a1..3bc46bb8f0 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -125,6 +125,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { }() // Preload all analyzed ranges. + iters := map[model.Fingerprint]local.SeriesIterator{} for offset, pt := range a.offsetPreloadTimes { start := a.Start.Add(-offset) end := a.End.Add(-offset) @@ -132,19 +133,35 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - err = p.PreloadRange(fp, start.Add(-rangeDuration), end, StalenessDelta) + startOfRange := start.Add(-rangeDuration) + if StalenessDelta > rangeDuration { + // Cover a weird corner case: The expression + // mixes up instants and ranges for the same + // series. We'll handle that over-all as + // range. But if the rangeDuration is smaller + // than the StalenessDelta, the range wouldn't + // cover everything potentially needed for the + // instant, so we have to extend startOfRange. + startOfRange = start.Add(-StalenessDelta) + } + iter, err := p.PreloadRange(fp, startOfRange, end) if err != nil { return nil, err } + iters[fp] = iter } for fp := range pt.instants { if err = contextDone(ctx, env); err != nil { return nil, err } - err = p.PreloadRange(fp, start, end, StalenessDelta) + // Need to look backwards by StalenessDelta but not + // forward because we always return the closest sample + // _before_ the reference time. + iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end) if err != nil { return nil, err } + iters[fp] = iter } } @@ -153,11 +170,11 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { switch n := node.(type) { case *VectorSelector: for fp := range n.metrics { - n.iterators[fp] = a.Storage.NewIterator(fp) + n.iterators[fp] = iters[fp] } case *MatrixSelector: for fp := range n.metrics { - n.iterators[fp] = a.Storage.NewIterator(fp) + n.iterators[fp] = iters[fp] } } return true diff --git a/promql/engine.go b/promql/engine.go index 1927a41ab0..599486cae4 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -688,15 +688,16 @@ func (ev *evaluator) eval(expr Expr) model.Value { func (ev *evaluator) vectorSelector(node *VectorSelector) vector { vec := vector{} for fp, it := range node.iterators { - sampleCandidates := it.ValueAtTime(ev.Timestamp.Add(-node.Offset)) - samplePair := chooseClosestBefore(sampleCandidates, ev.Timestamp.Add(-node.Offset)) - if samplePair != nil { - vec = append(vec, &sample{ - Metric: node.metrics[fp], - Value: samplePair.Value, - Timestamp: ev.Timestamp, - }) + refTime := ev.Timestamp.Add(-node.Offset) + samplePair := it.ValueAtOrBeforeTime(refTime) + if samplePair.Timestamp.Before(refTime.Add(-StalenessDelta)) { + continue // Sample outside of staleness policy window. } + vec = append(vec, &sample{ + Metric: node.metrics[fp], + Value: samplePair.Value, + Timestamp: ev.Timestamp, + }) } return vec } @@ -1168,23 +1169,6 @@ func shouldDropMetricName(op itemType) bool { // series is considered stale. var StalenessDelta = 5 * time.Minute -// chooseClosestBefore chooses the closest sample of a list of samples -// before or at a given target time. -func chooseClosestBefore(samples []model.SamplePair, timestamp model.Time) *model.SamplePair { - for _, candidate := range samples { - delta := candidate.Timestamp.Sub(timestamp) - // Samples before or at target time. - if delta <= 0 { - // Ignore samples outside of staleness policy window. - if -delta > StalenessDelta { - continue - } - return &candidate - } - } - return nil -} - // A queryGate controls the maximum number of concurrently running and waiting queries. type queryGate struct { ch chan struct{} diff --git a/promql/functions.go b/promql/functions.go index 09cde741c2..4728b1e96e 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -583,6 +583,11 @@ func funcPredictLinear(ev *evaluator, args Expressions) model.Value { } // add predicted delta to last value. + // TODO(beorn7): This is arguably suboptimal. The funcDeriv above has + // given us an estimate over the range. So we should add the delta to + // the value predicted for the end of the range. Also, once this has + // been rectified, we are not using BoundaryValues anywhere anymore, so + // we can kick out a whole lot of code. matrixBounds := ev.evalMatrixBounds(args[0]) outVec := make(vector, 0, len(signatureToDelta)) for _, samples := range matrixBounds { diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 1a2a9a9836..6a63360946 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -53,13 +53,34 @@ const ( doubleDelta ) -// chunkDesc contains meta-data for a chunk. Many of its methods are -// goroutine-safe proxies for chunk methods. +// chunkDesc contains meta-data for a chunk. Pay special attention to the +// documented requirements for calling its method (WRT pinning and locking). +// The doc comments spell out the requirements for each method, but here is an +// overview and general explanation: +// +// Everything that changes the pinning of the underlying chunk or deals with its +// eviction is protected by a mutex. This affects the following methods: pin, +// unpin, refCount, isEvicted, maybeEvict. These methods can be called at any +// time without further prerequisites. +// +// Another group of methods acts on (or sets) the underlying chunk. These +// methods involve no locking. They may only be called if the caller has pinned +// the chunk (to guarantee the chunk is not evicted concurrently). Also, the +// caller must make sure nobody else will call these methods concurrently, +// either by holding the sole reference to the chunkDesc (usually during loading +// or creation) or by locking the fingerprint of the series the chunkDesc +// belongs to. The affected methods are: add, lastTime, maybePopulateLastTime, +// lastSamplePair, setChunk. +// +// Finally, there is the firstTime method. It merely returns the immutable +// chunkFirstTime member variable. It's arguably not needed and only there for +// consistency with lastTime. It can be called at any time and doesn't involve +// locking. type chunkDesc struct { - sync.Mutex // TODO(beorn7): Try out if an RWMutex would help here. + sync.Mutex // Protects pinning. c chunk // nil if chunk is evicted. rCnt int - chunkFirstTime model.Time // Populated at creation. + chunkFirstTime model.Time // Populated at creation. Immutable. chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. // evictListElement is nil if the chunk is not in the evict list. @@ -83,16 +104,16 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { } } +// add adds a sample pair to the underlying chunk. The chunk must be pinned, and +// the caller must have locked the fingerprint of the series. func (cd *chunkDesc) add(s *model.SamplePair) []chunk { - cd.Lock() - defer cd.Unlock() - return cd.c.add(s) } // pin increments the refCount by one. Upon increment from 0 to 1, this // chunkDesc is removed from the evict list. To enable the latter, the -// evictRequests channel has to be provided. +// evictRequests channel has to be provided. This method can be called +// concurrently at any time. func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -106,7 +127,8 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { // unpin decrements the refCount by one. Upon decrement from 1 to 0, this // chunkDesc is added to the evict list. To enable the latter, the evictRequests -// channel has to be provided. +// channel has to be provided. This method can be called concurrently at any +// time. func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -121,6 +143,8 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { } } +// refCount returns the number of pins. This method can be called concurrently +// at any time. func (cd *chunkDesc) refCount() int { cd.Lock() defer cd.Unlock() @@ -128,34 +152,38 @@ func (cd *chunkDesc) refCount() int { return cd.rCnt } +// firstTime returns the timestamp of the first sample in the chunk. This method +// can be called concurrently at any time. It only returns the immutable +// cd.chunkFirstTime without any locking. Arguably, this method is +// useless. However, it provides consistency with the lastTime method. func (cd *chunkDesc) firstTime() model.Time { - // No lock required, will never be modified. return cd.chunkFirstTime } +// lastTime returns the timestamp of the last sample in the chunk. It must not +// be called concurrently with maybePopulateLastTime. If the chunkDesc is part +// of a memory series, this method requires the chunk to be pinned and the +// fingerprint of the time series to be locked. func (cd *chunkDesc) lastTime() model.Time { - cd.Lock() - defer cd.Unlock() - if cd.chunkLastTime != model.Earliest || cd.c == nil { return cd.chunkLastTime } return cd.c.newIterator().lastTimestamp() } +// maybePopulateLastTime populates the chunkLastTime from the underlying chunk +// if it has not yet happened. The chunk must be pinned, and the caller must +// have locked the fingerprint of the series. This method must not be called +// concurrently with lastTime. func (cd *chunkDesc) maybePopulateLastTime() { - cd.Lock() - defer cd.Unlock() - if cd.chunkLastTime == model.Earliest && cd.c != nil { cd.chunkLastTime = cd.c.newIterator().lastTimestamp() } } +// lastSamplePair returns the last sample pair of the underlying chunk. The +// chunk must be pinned. func (cd *chunkDesc) lastSamplePair() *model.SamplePair { - cd.Lock() - defer cd.Unlock() - if cd.c == nil { return nil } @@ -166,28 +194,22 @@ func (cd *chunkDesc) lastSamplePair() *model.SamplePair { } } +// isEvicted returns whether the chunk is evicted. The caller must have locked +// the fingerprint of the series. func (cd *chunkDesc) isEvicted() bool { + // Locking required here because we do not want the caller to force + // pinning the chunk first, so it could be evicted while this method is + // called. cd.Lock() defer cd.Unlock() return cd.c == nil } -func (cd *chunkDesc) contains(t model.Time) bool { - return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) -} - -func (cd *chunkDesc) chunk() chunk { - cd.Lock() - defer cd.Unlock() - - return cd.c -} - +// setChunk sets the underlying chunk. The caller must have locked the +// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first +// call pin and then set the chunk). func (cd *chunkDesc) setChunk(c chunk) { - cd.Lock() - defer cd.Unlock() - if cd.c != nil { panic("chunk already set") } @@ -196,7 +218,7 @@ func (cd *chunkDesc) setChunk(c chunk) { // maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // is now evicted, which includes the case that the chunk was evicted even -// before this method was called. +// before this method was called. It can be called concurrently at any time. func (cd *chunkDesc) maybeEvict() bool { cd.Lock() defer cd.Unlock() @@ -207,7 +229,10 @@ func (cd *chunkDesc) maybeEvict() bool { if cd.rCnt != 0 { return false } - // Last opportunity to populate chunkLastTime. + // Last opportunity to populate chunkLastTime. This is a safety + // guard. Regularly, chunkLastTime should be populated upon completion + // of a chunk before persistence can kick to unpin it (and thereby + // making it evictable in the first place). if cd.chunkLastTime == model.Earliest { cd.chunkLastTime = cd.c.newIterator().lastTimestamp() } @@ -251,12 +276,11 @@ type chunkIterator interface { sampleValueAtIndex(int) model.SampleValue // Gets the last sample value in the chunk. lastSampleValue() model.SampleValue - // Gets the two values that are immediately adjacent to a given time. In - // case a value exist at precisely the given time, only that single - // value is returned. Only the first or last value is returned (as a - // single value), if the given time is before or after the first or last - // value, respectively. - valueAtTime(model.Time) []model.SamplePair + // Gets the value that is closest before the given time. In case a value + // exist at precisely the given time, that value is returned. If no + // applicable value exists, a SamplePair with timestamp model.Earliest + // and value 0.0 is returned. + valueAtOrBeforeTime(model.Time) model.SamplePair // Gets all values contained within a given interval. rangeValues(metric.Interval) []model.SamplePair // Whether a given timestamp is contained between first and last value diff --git a/storage/local/delta.go b/storage/local/delta.go index 5d069afd6b..7222c5a157 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -301,41 +301,17 @@ type deltaEncodedChunkIterator struct { // length implements chunkIterator. func (it *deltaEncodedChunkIterator) length() int { return it.len } -// valueAtTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtTime(t model.Time) []model.SamplePair { +// valueAtOrBeforeTime implements chunkIterator. +func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { i := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(t) + return it.timestampAtIndex(i).After(t) }) - - switch i { - case 0: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(0), - Value: it.sampleValueAtIndex(0), - }} - case it.len: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(it.len - 1), - Value: it.sampleValueAtIndex(it.len - 1), - }} - default: - ts := it.timestampAtIndex(i) - if ts.Equal(t) { - return []model.SamplePair{{ - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }} - } - return []model.SamplePair{ - { - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - }, - { - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }, - } + if i == 0 { + return model.SamplePair{Timestamp: model.Earliest} + } + return model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), } } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index eaf3093cb9..e0e0856ce6 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -407,41 +407,17 @@ type doubleDeltaEncodedChunkIterator struct { // length implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } -// valueAtTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtTime(t model.Time) []model.SamplePair { +// valueAtOrBeforeTime implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { i := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(t) + return it.timestampAtIndex(i).After(t) }) - - switch i { - case 0: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(0), - Value: it.sampleValueAtIndex(0), - }} - case it.len: - return []model.SamplePair{{ - Timestamp: it.timestampAtIndex(it.len - 1), - Value: it.sampleValueAtIndex(it.len - 1), - }} - default: - ts := it.timestampAtIndex(i) - if ts.Equal(t) { - return []model.SamplePair{{ - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }} - } - return []model.SamplePair{ - { - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), - }, - { - Timestamp: ts, - Value: it.sampleValueAtIndex(i), - }, - } + if i == 0 { + return model.SamplePair{Timestamp: model.Earliest} + } + return model.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), } } diff --git a/storage/local/interface.go b/storage/local/interface.go index 454c2d9d5f..a62a13b19a 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -14,8 +14,6 @@ package local import ( - "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -52,10 +50,6 @@ type Storage interface { LabelValuesForLabelName(model.LabelName) model.LabelValues // Get the metric associated with the provided fingerprint. MetricForFingerprint(model.Fingerprint) metric.Metric - // Construct an iterator for a given fingerprint. - // The iterator will never return samples older than retention time, - // relative to the time NewIterator was called. - NewIterator(model.Fingerprint) SeriesIterator // Drop all time series associated with the given fingerprints. DropMetricsForFingerprints(...model.Fingerprint) // Run the various maintenance loops in goroutines. Returns when the @@ -77,12 +71,11 @@ type Storage interface { // modifying the corresponding series, but the iterator will represent the state // of the series prior the modification. type SeriesIterator interface { - // Gets the two values that are immediately adjacent to a given time. In - // case a value exist at precisely the given time, only that single - // value is returned. Only the first or last value is returned (as a - // single value), if the given time is before or after the first or last - // value, respectively. - ValueAtTime(model.Time) []model.SamplePair + // Gets the value that is closest before the given time. In case a value + // exist at precisely the given time, that value is returned. If no + // applicable value exists, a SamplePair with timestamp model.Earliest + // and value 0.0 is returned. + ValueAtOrBeforeTime(model.Time) model.SamplePair // Gets the boundary values of an interval: the first and last value // within a given interval. BoundaryValues(metric.Interval) []model.SamplePair @@ -90,15 +83,14 @@ type SeriesIterator interface { RangeValues(metric.Interval) []model.SamplePair } -// A Preloader preloads series data necessary for a query into memory and pins -// them until released via Close(). Its methods are generally not -// goroutine-safe. +// A Preloader preloads series data necessary for a query into memory, pins it +// until released via Close(), and returns an iterator for the pinned data. Its +// methods are generally not goroutine-safe. type Preloader interface { PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, - stalenessDelta time.Duration, - ) error + ) (SeriesIterator, error) // Close unpins any previously requested series data from memory. Close() } diff --git a/storage/local/preload.go b/storage/local/preload.go index 0fc5030d7a..cda04a864b 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -13,11 +13,7 @@ package local -import ( - "time" - - "github.com/prometheus/common/model" -) +import "github.com/prometheus/common/model" // memorySeriesPreloader is a Preloader for the memorySeriesStorage. type memorySeriesPreloader struct { @@ -29,74 +25,15 @@ type memorySeriesPreloader struct { func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, - stalenessDelta time.Duration, -) error { - cds, err := p.storage.preloadChunksForRange(fp, from, through, stalenessDelta) +) (SeriesIterator, error) { + cds, iter, err := p.storage.preloadChunksForRange(fp, from, through) if err != nil { - return err + return iter, err } p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return nil + return iter, nil } -/* -// MetricAtTime implements Preloader. -func (p *memorySeriesPreloader) MetricAtTime(fp model.Fingerprint, t model.Time) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: t, - through: t, - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return nil -} - -// MetricAtInterval implements Preloader. -func (p *memorySeriesPreloader) MetricAtInterval(fp model.Fingerprint, from, through model.Time, interval time.Duration) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: from, - through: through, - interval: interval, - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return -} - -// MetricRange implements Preloader. -func (p *memorySeriesPreloader) MetricRange(fp model.Fingerprint, t model.Time, rangeDuration time.Duration) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: t, - through: t, - rangeDuration: through.Sub(from), - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return -} - -// MetricRangeAtInterval implements Preloader. -func (p *memorySeriesPreloader) MetricRangeAtInterval(fp model.Fingerprint, from, through model.Time, interval, rangeDuration time.Duration) error { - cds, err := p.storage.preloadChunks(fp, &timeSelector{ - from: from, - through: through, - interval: interval, - rangeDuration: rangeDuration, - }) - if err != nil { - return err - } - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return -} -*/ - // Close implements Preloader. func (p *memorySeriesPreloader) Close() { for _, cd := range p.pinnedChunkDescs { diff --git a/storage/local/series.go b/storage/local/series.go index d731548810..6943e925a9 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -315,7 +315,7 @@ func (s *memorySeries) dropChunks(t model.Time) { // preloadChunks is an internal helper method. func (s *memorySeries) preloadChunks( indexes []int, fp model.Fingerprint, mss *memorySeriesStorage, -) ([]*chunkDesc, error) { +) ([]*chunkDesc, SeriesIterator, error) { loadIndexes := []int{} pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) for _, idx := range indexes { @@ -339,52 +339,47 @@ func (s *memorySeries) preloadChunks( cd.unpin(mss.evictRequests) } chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) - return nil, err + return nil, nopIter, err } for i, c := range chunks { s.chunkDescs[loadIndexes[i]].setChunk(c) } } - return pinnedChunkDescs, nil -} -/* -func (s *memorySeries) preloadChunksAtTime(t model.Time, p *persistence) (chunkDescs, error) { - s.mtx.Lock() - defer s.mtx.Unlock() - - if len(s.chunkDescs) == 0 { - return nil, nil + if !s.headChunkClosed && indexes[len(indexes)-1] == len(s.chunkDescs)-1 { + s.headChunkUsedByIterator = true } - var pinIndexes []int - // Find first chunk where lastTime() is after or equal to t. - i := sort.Search(len(s.chunkDescs), func(i int) bool { - return !s.chunkDescs[i].lastTime().Before(t) - }) - switch i { - case 0: - pinIndexes = []int{0} - case len(s.chunkDescs): - pinIndexes = []int{i - 1} - default: - if s.chunkDescs[i].contains(t) { - pinIndexes = []int{i} - } else { - pinIndexes = []int{i - 1, i} - } + iter := &boundedIterator{ + it: s.newIterator(pinnedChunkDescs), + start: model.Now().Add(-mss.dropAfter), } - return s.preloadChunks(pinIndexes, p) + return pinnedChunkDescs, iter, nil +} + +// newIterator returns a new SeriesIterator for the provided chunkDescs (which +// must be pinned). The caller must have locked the fingerprint of the +// memorySeries. +func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator { + chunks := make([]chunk, 0, len(pinnedChunkDescs)) + for _, cd := range pinnedChunkDescs { + // It's OK to directly access cd.c here (without locking) as the + // series FP is locked and the chunk is pinned. + chunks = append(chunks, cd.c) + } + return &memorySeriesIterator{ + chunks: chunks, + chunkIts: make([]chunkIterator, len(chunks)), + } } -*/ // preloadChunksForRange loads chunks for the given range from the persistence. // The caller must have locked the fingerprint of the series. func (s *memorySeries) preloadChunksForRange( from model.Time, through model.Time, fp model.Fingerprint, mss *memorySeriesStorage, -) ([]*chunkDesc, error) { +) ([]*chunkDesc, SeriesIterator, error) { firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() @@ -392,15 +387,16 @@ func (s *memorySeries) preloadChunksForRange( if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { cds, err := mss.loadChunkDescs(fp, s.persistWatermark) if err != nil { - return nil, err + return nil, nopIter, err } s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescsOffset = 0 s.persistWatermark += len(cds) + firstChunkDescTime = s.chunkDescs[0].firstTime() } - if len(s.chunkDescs) == 0 { - return nil, nil + if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) { + return nil, nopIter, nil } // Find first chunk with start time after "from". @@ -411,6 +407,13 @@ func (s *memorySeries) preloadChunksForRange( throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool { return s.chunkDescs[i].firstTime().After(through) }) + if fromIdx == len(s.chunkDescs) { + // Even the last chunk starts before "from". Find out if the + // series ends before "from" and we don't need to do anything. + if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) { + return nil, nopIter, nil + } + } if fromIdx > 0 { fromIdx-- } @@ -425,25 +428,6 @@ func (s *memorySeries) preloadChunksForRange( return s.preloadChunks(pinIndexes, fp, mss) } -// newIterator returns a new SeriesIterator. The caller must have locked the -// fingerprint of the memorySeries. -func (s *memorySeries) newIterator() SeriesIterator { - chunks := make([]chunk, 0, len(s.chunkDescs)) - for i, cd := range s.chunkDescs { - if chunk := cd.chunk(); chunk != nil { - if i == len(s.chunkDescs)-1 && !s.headChunkClosed { - s.headChunkUsedByIterator = true - } - chunks = append(chunks, chunk) - } - } - - return &memorySeriesIterator{ - chunks: chunks, - chunkIts: make([]chunkIterator, len(chunks)), - } -} - // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. @@ -482,70 +466,33 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - chunkIt chunkIterator // Last chunkIterator used by ValueAtTime. + chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. chunkIts []chunkIterator // Caches chunkIterators. chunks []chunk } -// ValueAtTime implements SeriesIterator. -func (it *memorySeriesIterator) ValueAtTime(t model.Time) []model.SamplePair { +// ValueAtOrBeforeTime implements SeriesIterator. +func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { // The most common case. We are iterating through a chunk. if it.chunkIt != nil && it.chunkIt.contains(t) { - return it.chunkIt.valueAtTime(t) + return it.chunkIt.valueAtOrBeforeTime(t) } if len(it.chunks) == 0 { - return nil + return model.SamplePair{Timestamp: model.Earliest} } - // Before or exactly on the first sample of the series. - it.chunkIt = it.chunkIterator(0) - ts := it.chunkIt.timestampAtIndex(0) - if !t.After(ts) { - // return first value of first chunk - return []model.SamplePair{{ - Timestamp: ts, - Value: it.chunkIt.sampleValueAtIndex(0), - }} - } - - // After or exactly on the last sample of the series. - it.chunkIt = it.chunkIterator(len(it.chunks) - 1) - ts = it.chunkIt.lastTimestamp() - if !t.Before(ts) { - // return last value of last chunk - return []model.SamplePair{{ - Timestamp: ts, - Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1), - }} - } - - // Find last chunk where firstTime() is before or equal to t. + // Find the last chunk where firstTime() is before or equal to t. l := len(it.chunks) - 1 i := sort.Search(len(it.chunks), func(i int) bool { return !it.chunks[l-i].firstTime().After(t) }) if i == len(it.chunks) { - panic("out of bounds") + // Even the first chunk starts after t. + return model.SamplePair{Timestamp: model.Earliest} } it.chunkIt = it.chunkIterator(l - i) - ts = it.chunkIt.lastTimestamp() - if t.After(ts) { - // We ended up between two chunks. - sp1 := model.SamplePair{ - Timestamp: ts, - Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1), - } - it.chunkIt = it.chunkIterator(l - i + 1) - return []model.SamplePair{ - sp1, - { - Timestamp: it.chunkIt.timestampAtIndex(0), - Value: it.chunkIt.sampleValueAtIndex(0), - }, - } - } - return it.chunkIt.valueAtTime(t) + return it.chunkIt.valueAtOrBeforeTime(t) } // BoundaryValues implements SeriesIterator. @@ -578,18 +525,24 @@ func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.Sampl } chunkIt := it.chunkIterator(i + j) if len(values) == 0 { - firstValues := chunkIt.valueAtTime(in.OldestInclusive) - switch len(firstValues) { - case 2: - values = append(values, firstValues[1]) - case 1: - values = firstValues - default: - panic("unexpected return from valueAtTime") + for s := range chunkIt.values() { + if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) { + values = append(values, *s) + // We cannot just break out here as we have to consume all + // the values to not leak a goroutine. This could obviously + // be made much neater with more suitable methods in the chunk + // interface. But currently, BoundaryValues is only used by + // `predict_linear` so we would pollute the chunk interface + // unduly just for one single corner case. Plus, even that use + // of BoundaryValues is suboptimal and should be replaced. + } } } if chunkIt.lastTimestamp().After(in.NewestInclusive) { - values = append(values, chunkIt.valueAtTime(in.NewestInclusive)[0]) + s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive) + if s.Timestamp != model.Earliest { + values = append(values, s) + } break } } @@ -644,8 +597,8 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { type nopSeriesIterator struct{} // ValueAtTime implements SeriesIterator. -func (i nopSeriesIterator) ValueAtTime(t model.Time) []model.SamplePair { - return []model.SamplePair{} +func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { + return model.SamplePair{Timestamp: model.Earliest} } // BoundaryValues implements SeriesIterator. @@ -657,3 +610,5 @@ func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { return []model.SamplePair{} } + +var nopIter nopSeriesIterator // A nopSeriesIterator for convenience. Can be shared. diff --git a/storage/local/storage.go b/storage/local/storage.go index f828c01fd0..2f869b5ad4 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -345,26 +345,6 @@ func (s *memorySeriesStorage) WaitForIndexing() { s.persistence.waitForIndexing() } -// NewIterator implements Storage. -func (s *memorySeriesStorage) NewIterator(fp model.Fingerprint) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series, ok := s.fpToSeries.get(fp) - if !ok { - // Oops, no series for fp found. That happens if, after - // preloading is done, the whole series is identified as old - // enough for purging and hence purged for good. As there is no - // data left to iterate over, return an iterator that will never - // return any values. - return nopSeriesIterator{} - } - return &boundedIterator{ - it: series.newIterator(), - start: model.Now().Add(-s.dropAfter), - } -} - // LastSampleForFingerprint implements Storage. func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) *model.SamplePair { s.fpLocker.Lock(fp) @@ -384,12 +364,12 @@ type boundedIterator struct { start model.Time } -// ValueAtTime implements the SeriesIterator interface. -func (bit *boundedIterator) ValueAtTime(ts model.Time) []model.SamplePair { +// ValueAtOrBeforeTime implements the SeriesIterator interface. +func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { if ts < bit.start { - return []model.SamplePair{} + return model.SamplePair{Timestamp: model.Earliest} } - return bit.it.ValueAtTime(ts) + return bit.it.ValueAtOrBeforeTime(ts) } // BoundaryValues implements the SeriesIterator interface. @@ -569,6 +549,8 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } +// ErrOutOfOrderSample is returned if a sample has a timestamp before the latest +// timestamp in the series it is appended to. var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") // Append implements Storage. @@ -705,8 +687,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, - stalenessDelta time.Duration, -) ([]*chunkDesc, error) { +) ([]*chunkDesc, SeriesIterator, error) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -714,20 +695,20 @@ func (s *memorySeriesStorage) preloadChunksForRange( if !ok { has, first, last, err := s.persistence.hasArchivedMetric(fp) if err != nil { - return nil, err + return nil, nopIter, err } if !has { s.invalidPreloadRequestsCount.Inc() - return nil, nil + return nil, nopIter, nil } - if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { + if from.Before(last) && through.After(first) { metric, err := s.persistence.archivedMetric(fp) if err != nil { - return nil, err + return nil, nopIter, err } series = s.getOrCreateSeries(fp, metric) } else { - return nil, nil + return nil, nopIter, nil } } return series.preloadChunksForRange(from, through, fp, s) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 74bb631535..4c7034f646 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -405,19 +405,17 @@ func TestRetentionCutoff(t *testing.T) { defer pl.Close() // Preload everything. - err := pl.PreloadRange(fp, insertStart, now, 5*time.Minute) + it, err := pl.PreloadRange(fp, insertStart, now) if err != nil { t.Fatalf("Error preloading outdated chunks: %s", err) } - it := s.NewIterator(fp) - - vals := it.ValueAtTime(now.Add(-61 * time.Minute)) - if len(vals) != 0 { + val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) + if val.Timestamp != model.Earliest { t.Errorf("unexpected result for timestamp before retention period") } - vals = it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) + vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) // We get 59 values here because the model.Now() is slightly later // than our now. if len(vals) != 59 { @@ -502,11 +500,18 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - it := s.NewIterator(fpList[0]) + _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.NewIterator(fpList[1]) + + _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -528,11 +533,18 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - it = s.NewIterator(fpList[0]) + _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.NewIterator(fpList[1]) + + _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -640,7 +652,7 @@ func TestChunkType1(t *testing.T) { testChunk(t, 1) } -func testValueAtTime(t *testing.T, encoding chunkEncoding) { +func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -658,82 +670,66 @@ func testValueAtTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.NewIterator(fp) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } // #1 Exactly on a sample. for i, expected := range samples { - actual := it.ValueAtTime(expected.Timestamp) + actual := it.ValueAtOrBeforeTime(expected.Timestamp) - if len(actual) != 1 { - t.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + t.Errorf("1.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - t.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - t.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value) + if expected.Value != actual.Value { + t.Errorf("1.%d. Got %v; want %v", i, actual.Value, expected.Value) } } // #2 Between samples. - for i, expected1 := range samples { + for i, expected := range samples { if i == len(samples)-1 { continue } - expected2 := samples[i+1] - actual := it.ValueAtTime(expected1.Timestamp + 1) + actual := it.ValueAtOrBeforeTime(expected.Timestamp + 1) - if len(actual) != 2 { - t.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + t.Errorf("2.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected1.Timestamp != actual[0].Timestamp { - t.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp) - } - if expected1.Value != actual[0].Value { - t.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value) - } - if expected2.Timestamp != actual[1].Timestamp { - t.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp) - } - if expected2.Value != actual[1].Value { - t.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value) + if expected.Value != actual.Value { + t.Errorf("2.%d. Got %v; want %v", i, actual.Value, expected.Value) } } // #3 Corner cases: Just before the first sample, just after the last. - expected := samples[0] - actual := it.ValueAtTime(expected.Timestamp - 1) - if len(actual) != 1 { - t.Fatalf("3.1. Expected exactly one result, got %d.", len(actual)) + expected := &model.Sample{Timestamp: model.Earliest} + actual := it.ValueAtOrBeforeTime(samples[0].Timestamp - 1) + if expected.Timestamp != actual.Timestamp { + t.Errorf("3.1. Got %v; want %v", actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - t.Errorf("3.1. Got %v; want %v", actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - t.Errorf("3.1. Got %v; want %v", actual[0].Value, expected.Value) + if expected.Value != actual.Value { + t.Errorf("3.1. Got %v; want %v", actual.Value, expected.Value) } expected = samples[len(samples)-1] - actual = it.ValueAtTime(expected.Timestamp + 1) - if len(actual) != 1 { - t.Fatalf("3.2. Expected exactly one result, got %d.", len(actual)) + actual = it.ValueAtOrBeforeTime(expected.Timestamp + 1) + if expected.Timestamp != actual.Timestamp { + t.Errorf("3.2. Got %v; want %v", actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - t.Errorf("3.2. Got %v; want %v", actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - t.Errorf("3.2. Got %v; want %v", actual[0].Value, expected.Value) + if expected.Value != actual.Value { + t.Errorf("3.2. Got %v; want %v", actual.Value, expected.Value) } } func TestValueAtTimeChunkType0(t *testing.T) { - testValueAtTime(t, 0) + testValueAtOrBeforeTime(t, 0) } func TestValueAtTimeChunkType1(t *testing.T) { - testValueAtTime(t, 1) + testValueAtOrBeforeTime(t, 1) } -func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) { +func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -751,59 +747,67 @@ func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + b.Fatalf("Error preloading everything: %s", err) + } + b.ResetTimer() for i := 0; i < b.N; i++ { - it := s.NewIterator(fp) - // #1 Exactly on a sample. for i, expected := range samples { - actual := it.ValueAtTime(expected.Timestamp) + actual := it.ValueAtOrBeforeTime(expected.Timestamp) - if len(actual) != 1 { - b.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + b.Errorf("1.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected.Timestamp != actual[0].Timestamp { - b.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp) - } - if expected.Value != actual[0].Value { - b.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value) + if expected.Value != actual.Value { + b.Errorf("1.%d. Got %v; want %v", i, actual.Value, expected.Value) } } // #2 Between samples. - for i, expected1 := range samples { + for i, expected := range samples { if i == len(samples)-1 { continue } - expected2 := samples[i+1] - actual := it.ValueAtTime(expected1.Timestamp + 1) + actual := it.ValueAtOrBeforeTime(expected.Timestamp + 1) - if len(actual) != 2 { - b.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) + if expected.Timestamp != actual.Timestamp { + b.Errorf("2.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp) } - if expected1.Timestamp != actual[0].Timestamp { - b.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp) - } - if expected1.Value != actual[0].Value { - b.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value) - } - if expected2.Timestamp != actual[1].Timestamp { - b.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp) - } - if expected2.Value != actual[1].Value { - b.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value) + if expected.Value != actual.Value { + b.Errorf("2.%d. Got %v; want %v", i, actual.Value, expected.Value) } } + + // #3 Corner cases: Just before the first sample, just after the last. + expected := &model.Sample{Timestamp: model.Earliest} + actual := it.ValueAtOrBeforeTime(samples[0].Timestamp - 1) + if expected.Timestamp != actual.Timestamp { + b.Errorf("3.1. Got %v; want %v", actual.Timestamp, expected.Timestamp) + } + if expected.Value != actual.Value { + b.Errorf("3.1. Got %v; want %v", actual.Value, expected.Value) + } + expected = samples[len(samples)-1] + actual = it.ValueAtOrBeforeTime(expected.Timestamp + 1) + if expected.Timestamp != actual.Timestamp { + b.Errorf("3.2. Got %v; want %v", actual.Timestamp, expected.Timestamp) + } + if expected.Value != actual.Value { + b.Errorf("3.2. Got %v; want %v", actual.Value, expected.Value) + } } } -func BenchmarkValueAtTimeChunkType0(b *testing.B) { - benchmarkValueAtTime(b, 0) +func BenchmarkValueAtOrBeforeTimeChunkType0(b *testing.B) { + benchmarkValueAtOrBeforeTime(b, 0) } func BenchmarkValueAtTimeChunkType1(b *testing.B) { - benchmarkValueAtTime(b, 1) + benchmarkValueAtOrBeforeTime(b, 1) } func testRangeValues(t *testing.T, encoding chunkEncoding) { @@ -824,7 +828,10 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.NewIterator(fp) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } // #1 Zero length interval at sample. for i, expected := range samples { @@ -976,12 +983,14 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + b.Fatalf("Error preloading everything: %s", err) + } + b.ResetTimer() for i := 0; i < b.N; i++ { - - it := s.NewIterator(fp) - for _, sample := range samples { actual := it.RangeValues(metric.Interval{ OldestInclusive: sample.Timestamp - 20, @@ -1023,7 +1032,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - it := s.NewIterator(fp) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } actual := it.BoundaryValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1041,7 +1053,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - it = s.NewIterator(fp) + _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + if err != nil { + t.Fatalf("Error preloading everything: %s", err) + } actual = it.BoundaryValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1215,7 +1230,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { // Load everything back. p := s.NewPreloader() - p.PreloadRange(fp, 0, 100000, time.Hour) + p.PreloadRange(fp, 0, 100000) if oldLen != len(series.chunkDescs) { t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) @@ -1513,20 +1528,21 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, t.Fatal(err) } p := s.NewPreloader() - p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour) - found := s.NewIterator(fp).ValueAtTime(sample.Timestamp) - if len(found) != 1 { - t.Errorf("Sample %#v: Expected exactly one value, found %d.", sample, len(found)) + it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) + if err != nil { + t.Fatal(err) + } + found := it.ValueAtOrBeforeTime(sample.Timestamp) + if found.Timestamp == model.Earliest { + t.Errorf("Sample %#v: Expected sample not found.", sample) result = false p.Close() continue } - want := sample.Value - got := found[0].Value - if want != got || sample.Timestamp != found[0].Timestamp { + if sample.Value != found.Value || sample.Timestamp != found.Timestamp { t.Errorf( "Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", - want, sample.Timestamp, got, found[0].Timestamp, + sample.Value, sample.Timestamp, found.Value, found.Timestamp, ) result = false } @@ -1559,13 +1575,11 @@ func TestAppendOutOfOrder(t *testing.T) { pl := s.NewPreloader() defer pl.Close() - err = pl.PreloadRange(fp, 0, 2, 5*time.Minute) + it, err := pl.PreloadRange(fp, 0, 2) if err != nil { t.Fatalf("Error preloading chunks: %s", err) } - it := s.NewIterator(fp) - want := []model.SamplePair{ { Timestamp: 0, From d290340367e609d97b5b437a0be5e8f70de87515 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 19 Feb 2016 12:24:29 +0100 Subject: [PATCH 2/5] Fix and improve chunkDesc locking --- storage/local/chunk.go | 71 +++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 6a63360946..ac54a5dc1d 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -54,9 +54,9 @@ const ( ) // chunkDesc contains meta-data for a chunk. Pay special attention to the -// documented requirements for calling its method (WRT pinning and locking). -// The doc comments spell out the requirements for each method, but here is an -// overview and general explanation: +// documented requirements for calling its method concurrently (WRT pinning and +// locking). The doc comments spell out the requirements for each method, but +// here is an overview and general explanation: // // Everything that changes the pinning of the underlying chunk or deals with its // eviction is protected by a mutex. This affects the following methods: pin, @@ -69,13 +69,25 @@ const ( // caller must make sure nobody else will call these methods concurrently, // either by holding the sole reference to the chunkDesc (usually during loading // or creation) or by locking the fingerprint of the series the chunkDesc -// belongs to. The affected methods are: add, lastTime, maybePopulateLastTime, -// lastSamplePair, setChunk. +// belongs to. The affected methods are: add, maybePopulateLastTime, setChunk. // -// Finally, there is the firstTime method. It merely returns the immutable -// chunkFirstTime member variable. It's arguably not needed and only there for -// consistency with lastTime. It can be called at any time and doesn't involve -// locking. +// Finally, there is the special cases firstTime and lastTime. lastTime requires +// to have locked the fingerprint of the series but the chunk does not need to +// be pinned. That's because the chunkLastTime field in chunkDesc gets populated +// upon completion of the chunk (when it is still pinned, and which happens +// while the series's fingerprint is locked). Once that has happened, calling +// lastTime does not require the chunk to be loaded anymore. Before that has +// happened, the chunk is pinned anyway. The chunkFirstTime field in chunkDesc +// is populated upon creation of a chunkDesc, so it is alway safe to call +// firstTime. The firstTime method is arguably not needed and only there for +// consistency with lastTime. +// +// Yet another (deprecated) case is lastSamplePair. It's used in federation and +// must be callable without pinning. Locking the fingerprint of the series is +// still required (to avoid concurrent appends to the chunk). The call is +// relatively expensive because of the required acquisition of the evict +// mutex. It will go away, though, once tracking the lastSamplePair has been +// moved into the series object. type chunkDesc struct { sync.Mutex // Protects pinning. c chunk // nil if chunk is evicted. @@ -104,8 +116,9 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { } } -// add adds a sample pair to the underlying chunk. The chunk must be pinned, and -// the caller must have locked the fingerprint of the series. +// add adds a sample pair to the underlying chunk. For safe concurrent access, +// The chunk must be pinned, and the caller must have locked the fingerprint of +// the series. func (cd *chunkDesc) add(s *model.SamplePair) []chunk { return cd.c.add(s) } @@ -143,7 +156,7 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { } } -// refCount returns the number of pins. This method can be called concurrently +// refCount returns the number of pins. This method can be called concurrently // at any time. func (cd *chunkDesc) refCount() int { cd.Lock() @@ -160,10 +173,9 @@ func (cd *chunkDesc) firstTime() model.Time { return cd.chunkFirstTime } -// lastTime returns the timestamp of the last sample in the chunk. It must not -// be called concurrently with maybePopulateLastTime. If the chunkDesc is part -// of a memory series, this method requires the chunk to be pinned and the -// fingerprint of the time series to be locked. +// lastTime returns the timestamp of the last sample in the chunk. For safe +// concurrent access, this method requires the fingerprint of the time series to +// be locked. func (cd *chunkDesc) lastTime() model.Time { if cd.chunkLastTime != model.Earliest || cd.c == nil { return cd.chunkLastTime @@ -172,18 +184,24 @@ func (cd *chunkDesc) lastTime() model.Time { } // maybePopulateLastTime populates the chunkLastTime from the underlying chunk -// if it has not yet happened. The chunk must be pinned, and the caller must -// have locked the fingerprint of the series. This method must not be called -// concurrently with lastTime. +// if it has not yet happened. Call this method directly after having added the +// last sample to a chunk or after closing a head chunk due to age. For safe +// concurrent access, the chunk must be pinned, and the caller must have locked +// the fingerprint of the series. func (cd *chunkDesc) maybePopulateLastTime() { if cd.chunkLastTime == model.Earliest && cd.c != nil { cd.chunkLastTime = cd.c.newIterator().lastTimestamp() } } -// lastSamplePair returns the last sample pair of the underlying chunk. The -// chunk must be pinned. +// lastSamplePair returns the last sample pair of the underlying chunk, or nil +// if the chunk is evicted. For safe concurrent access, this method requires the +// fingerprint of the time series to be locked. +// TODO(beorn7): Move up into memorySeries. func (cd *chunkDesc) lastSamplePair() *model.SamplePair { + cd.Lock() + defer cd.Unlock() + if cd.c == nil { return nil } @@ -194,8 +212,8 @@ func (cd *chunkDesc) lastSamplePair() *model.SamplePair { } } -// isEvicted returns whether the chunk is evicted. The caller must have locked -// the fingerprint of the series. +// isEvicted returns whether the chunk is evicted. For safe concurrent access, +// the caller must have locked the fingerprint of the series. func (cd *chunkDesc) isEvicted() bool { // Locking required here because we do not want the caller to force // pinning the chunk first, so it could be evicted while this method is @@ -229,12 +247,9 @@ func (cd *chunkDesc) maybeEvict() bool { if cd.rCnt != 0 { return false } - // Last opportunity to populate chunkLastTime. This is a safety - // guard. Regularly, chunkLastTime should be populated upon completion - // of a chunk before persistence can kick to unpin it (and thereby - // making it evictable in the first place). if cd.chunkLastTime == model.Earliest { - cd.chunkLastTime = cd.c.newIterator().lastTimestamp() + // This must never happen. + panic("chunkLastTime not populated for evicted chunk") } cd.c = nil chunkOps.WithLabelValues(evict).Inc() From 28e9bbc15f570c607ccffc09f7e89266f4de7247 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 24 Feb 2016 13:58:34 +0100 Subject: [PATCH 3/5] Populate chunkDesc.chunkLastTime during checkpoint loading, too --- storage/local/persistence.go | 20 ++++++---- storage/local/persistence_test.go | 61 +++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 8a1c178f5a..22c16eccd2 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -823,6 +823,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in } } + headChunkClosed := true // Initial assumption. for i := int64(0); i < numChunkDescs; i++ { if i < persistWatermark { firstTime, err := binary.ReadVarint(r) @@ -844,6 +845,9 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in chunkDescsTotal++ } else { // Non-persisted chunk. + // If there are non-persisted chunks at all, we consider + // the head chunk not to be closed yet. + headChunkClosed = false encoding, err := r.ReadByte() if err != nil { log.Warn("Could not decode chunk type:", err) @@ -856,17 +860,17 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in p.dirty = true return sm, chunksToPersist, nil } - chunkDescs[i] = newChunkDesc(chunk, chunk.firstTime()) - chunksToPersist++ + cd := newChunkDesc(chunk, chunk.firstTime()) + if i < numChunkDescs-1 { + // This is NOT the head chunk. So it's a chunk + // to be persisted, and we need to populate lastTime. + chunksToPersist++ + cd.maybePopulateLastTime() + } + chunkDescs[i] = cd } } - headChunkClosed := persistWatermark >= numChunkDescs - if !headChunkClosed { - // Head chunk is not ready for persisting yet. - chunksToPersist-- - } - fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ metric: model.Metric(metric), chunkDescs: chunkDescs, diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index fa3525f774..47cf9078a1 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -485,6 +485,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS1.headChunkClosed { t.Error("headChunkClosed is true") } + if loadedS1.head().chunkFirstTime != 1 { + t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime) + } + if loadedS1.head().chunkLastTime != model.Earliest { + t.Error("want chunkLastTime in head chunk to be unset") + } } else { t.Errorf("couldn't find %v in loaded map", m1) } @@ -501,6 +507,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if !loadedS3.headChunkClosed { t.Error("headChunkClosed is false") } + if loadedS3.head().chunkFirstTime != 2 { + t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime) + } + if loadedS3.head().chunkLastTime != 2 { + t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime) + } } else { t.Errorf("couldn't find %v in loaded map", m3) } @@ -526,6 +538,27 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS4.headChunkClosed { t.Error("headChunkClosed is true") } + for i, cd := range loadedS4.chunkDescs { + if cd.chunkFirstTime != cd.c.firstTime() { + t.Errorf( + "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", + i, cd.c.firstTime(), cd.chunkFirstTime, + ) + } + if i == len(loadedS4.chunkDescs)-1 { + // Head chunk. + if cd.chunkLastTime != model.Earliest { + t.Error("want chunkLastTime in head chunk to be unset") + } + continue + } + if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + t.Errorf( + "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", + i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + ) + } + } } else { t.Errorf("couldn't find %v in loaded map", m4) } @@ -551,6 +584,34 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if loadedS5.headChunkClosed { t.Error("headChunkClosed is true") } + for i, cd := range loadedS5.chunkDescs { + if i < 3 { + // Evicted chunks. + if cd.chunkFirstTime == model.Earliest { + t.Errorf("chunkDesc[%d]: chunkLastTime not set", i) + } + continue + } + if cd.chunkFirstTime != cd.c.firstTime() { + t.Errorf( + "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", + i, cd.c.firstTime(), cd.chunkFirstTime, + ) + } + if i == len(loadedS5.chunkDescs)-1 { + // Head chunk. + if cd.chunkLastTime != model.Earliest { + t.Error("want chunkLastTime in head chunk to be unset") + } + continue + } + if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + t.Errorf( + "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", + i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + ) + } + } } else { t.Errorf("couldn't find %v in loaded map", m5) } From 2581648f70912f6af93a162b543e76749ce29b4a Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 2 Mar 2016 13:45:17 +0100 Subject: [PATCH 4/5] Separate iterators by offset Add test that exposes the problem. --- promql/analyzer.go | 12 +++++++----- storage/local/chunk.go | 6 +++--- storage/local/interface.go | 2 +- storage/local/preload.go | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/promql/analyzer.go b/promql/analyzer.go index 3bc46bb8f0..6e052656da 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -125,8 +125,10 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { }() // Preload all analyzed ranges. - iters := map[model.Fingerprint]local.SeriesIterator{} + iters := map[time.Duration]map[model.Fingerprint]local.SeriesIterator{} for offset, pt := range a.offsetPreloadTimes { + itersForDuration := map[model.Fingerprint]local.SeriesIterator{} + iters[offset] = itersForDuration start := a.Start.Add(-offset) end := a.End.Add(-offset) for fp, rangeDuration := range pt.ranges { @@ -148,7 +150,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err != nil { return nil, err } - iters[fp] = iter + itersForDuration[fp] = iter } for fp := range pt.instants { if err = contextDone(ctx, env); err != nil { @@ -161,7 +163,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err != nil { return nil, err } - iters[fp] = iter + itersForDuration[fp] = iter } } @@ -170,11 +172,11 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { switch n := node.(type) { case *VectorSelector: for fp := range n.metrics { - n.iterators[fp] = iters[fp] + n.iterators[fp] = iters[n.Offset][fp] } case *MatrixSelector: for fp := range n.metrics { - n.iterators[fp] = iters[fp] + n.iterators[fp] = iters[n.Offset][fp] } } return true diff --git a/storage/local/chunk.go b/storage/local/chunk.go index ac54a5dc1d..e0be10b142 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -54,7 +54,7 @@ const ( ) // chunkDesc contains meta-data for a chunk. Pay special attention to the -// documented requirements for calling its method concurrently (WRT pinning and +// documented requirements for calling its methods concurrently (WRT pinning and // locking). The doc comments spell out the requirements for each method, but // here is an overview and general explanation: // @@ -71,7 +71,7 @@ const ( // or creation) or by locking the fingerprint of the series the chunkDesc // belongs to. The affected methods are: add, maybePopulateLastTime, setChunk. // -// Finally, there is the special cases firstTime and lastTime. lastTime requires +// Finally, there are the special cases firstTime and lastTime. lastTime requires // to have locked the fingerprint of the series but the chunk does not need to // be pinned. That's because the chunkLastTime field in chunkDesc gets populated // upon completion of the chunk (when it is still pinned, and which happens @@ -292,7 +292,7 @@ type chunkIterator interface { // Gets the last sample value in the chunk. lastSampleValue() model.SampleValue // Gets the value that is closest before the given time. In case a value - // exist at precisely the given time, that value is returned. If no + // exists at precisely the given time, that value is returned. If no // applicable value exists, a SamplePair with timestamp model.Earliest // and value 0.0 is returned. valueAtOrBeforeTime(model.Time) model.SamplePair diff --git a/storage/local/interface.go b/storage/local/interface.go index a62a13b19a..005b397266 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -72,7 +72,7 @@ type Storage interface { // of the series prior the modification. type SeriesIterator interface { // Gets the value that is closest before the given time. In case a value - // exist at precisely the given time, that value is returned. If no + // exists at precisely the given time, that value is returned. If no // applicable value exists, a SamplePair with timestamp model.Earliest // and value 0.0 is returned. ValueAtOrBeforeTime(model.Time) model.SamplePair diff --git a/storage/local/preload.go b/storage/local/preload.go index cda04a864b..08a88875f7 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -28,7 +28,7 @@ func (p *memorySeriesPreloader) PreloadRange( ) (SeriesIterator, error) { cds, iter, err := p.storage.preloadChunksForRange(fp, from, through) if err != nil { - return iter, err + return nil, err } p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) return iter, nil From 79a2ae2d2e512b5339bb929fef779be8ede2365e Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 2 Mar 2016 23:00:23 +0100 Subject: [PATCH 5/5] Add missing test file --- promql/testdata/selectors.test | 37 ++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 promql/testdata/selectors.test diff --git a/promql/testdata/selectors.test b/promql/testdata/selectors.test new file mode 100644 index 0000000000..f7eee977bc --- /dev/null +++ b/promql/testdata/selectors.test @@ -0,0 +1,37 @@ +load 10s + http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000 + http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000 + http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000 + http_requests{job="api-server", instance="1", group="canary"} 0+40x2000 + +eval instant at 8000s rate(http_requests[1m]) + {job="api-server", instance="0", group="production"} 1 + {job="api-server", instance="1", group="production"} 2 + {job="api-server", instance="0", group="canary"} 3 + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 18000s rate(http_requests[1m]) + {job="api-server", instance="0", group="production"} 3 + {job="api-server", instance="1", group="production"} 3 + {job="api-server", instance="0", group="canary"} 8 + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 8000s rate(http_requests{group=~"pro.*"}[1m]) + {job="api-server", instance="0", group="production"} 1 + {job="api-server", instance="1", group="production"} 2 + +eval instant at 18000s rate(http_requests{group=~".*ry", instance="1"}[1m]) + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 18000s rate(http_requests{instance!="3"}[1m] offset 10000s) + {job="api-server", instance="0", group="production"} 1 + {job="api-server", instance="1", group="production"} 2 + {job="api-server", instance="0", group="canary"} 3 + {job="api-server", instance="1", group="canary"} 4 + +eval instant at 18000s rate(http_requests[40s]) - rate(http_requests[1m] offset 10000s) + {job="api-server", instance="0", group="production"} 2 + {job="api-server", instance="1", group="production"} 1 + {job="api-server", instance="0", group="canary"} 5 + {job="api-server", instance="1", group="canary"} 0 +