diff --git a/promql/analyzer.go b/promql/analyzer.go index 32bdc64ed..b1695b7a4 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -73,7 +73,7 @@ func (a *Analyzer) Analyze(ctx context.Context) error { switch n := node.(type) { case *VectorSelector: pt := getPreloadTimes(n.Offset) - fpts := a.Storage.GetFingerprintsForLabelMatchers(n.LabelMatchers) + fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers) n.fingerprints = fpts n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{} n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{} @@ -84,11 +84,11 @@ func (a *Analyzer) Analyze(ctx context.Context) error { if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges { pt.instants[fp] = struct{}{} } - n.metrics[fp] = a.Storage.GetMetricForFingerprint(fp) + n.metrics[fp] = a.Storage.MetricForFingerprint(fp) } case *MatrixSelector: pt := getPreloadTimes(n.Offset) - fpts := a.Storage.GetFingerprintsForLabelMatchers(n.LabelMatchers) + fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers) n.fingerprints = fpts n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{} n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{} @@ -100,7 +100,7 @@ func (a *Analyzer) Analyze(ctx context.Context) error { // an instant for the same fingerprint, should we have one. delete(pt.instants, fp) } - n.metrics[fp] = a.Storage.GetMetricForFingerprint(fp) + n.metrics[fp] = a.Storage.MetricForFingerprint(fp) } } return true diff --git a/promql/engine.go b/promql/engine.go index 08e49f25c..1b2dc2a89 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -644,7 +644,7 @@ func (ev *evaluator) eval(expr Expr) Value { func (ev *evaluator) vectorSelector(node *VectorSelector) Vector { vec := Vector{} for fp, it := range node.iterators { - sampleCandidates := it.GetValueAtTime(ev.Timestamp.Add(-node.Offset)) + sampleCandidates := it.ValueAtTime(ev.Timestamp.Add(-node.Offset)) samplePair := chooseClosestSample(sampleCandidates, ev.Timestamp.Add(-node.Offset)) if samplePair != nil { vec = append(vec, &Sample{ @@ -666,7 +666,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { sampleStreams := make([]*SampleStream, 0, len(node.iterators)) for fp, it := range node.iterators { - samplePairs := it.GetRangeValues(interval) + samplePairs := it.RangeValues(interval) if len(samplePairs) == 0 { continue } @@ -695,7 +695,7 @@ func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) Matrix { sampleStreams := make([]*SampleStream, 0, len(node.iterators)) for fp, it := range node.iterators { - samplePairs := it.GetBoundaryValues(interval) + samplePairs := it.BoundaryValues(interval) if len(samplePairs) == 0 { continue } diff --git a/storage/local/chunk.go b/storage/local/chunk.go index e1bac7f8b..e911e4260 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -41,8 +41,8 @@ const ( // goroutine-safe proxies for chunk methods. type chunkDesc struct { sync.Mutex - chunk chunk // nil if chunk is evicted. - refCount int + c chunk // nil if chunk is evicted. + rCnt int chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted. chunkLastTime clientmodel.Timestamp // Used if chunk is evicted. @@ -59,14 +59,14 @@ func newChunkDesc(c chunk) *chunkDesc { chunkOps.WithLabelValues(createAndPin).Inc() atomic.AddInt64(&numMemChunks, 1) numMemChunkDescs.Inc() - return &chunkDesc{chunk: c, refCount: 1} + return &chunkDesc{c: c, rCnt: 1} } func (cd *chunkDesc) add(s *metric.SamplePair) []chunk { cd.Lock() defer cd.Unlock() - return cd.chunk.add(s) + return cd.c.add(s) } // pin increments the refCount by one. Upon increment from 0 to 1, this @@ -76,11 +76,11 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() - if cd.refCount == 0 { + if cd.rCnt == 0 { // Remove ourselves from the evict list. evictRequests <- evictRequest{cd, false} } - cd.refCount++ + cd.rCnt++ } // unpin decrements the refCount by one. Upon decrement from 1 to 0, this @@ -90,69 +90,69 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() - if cd.refCount == 0 { + if cd.rCnt == 0 { panic("cannot unpin already unpinned chunk") } - cd.refCount-- - if cd.refCount == 0 { + cd.rCnt-- + if cd.rCnt == 0 { // Add ourselves to the back of the evict list. evictRequests <- evictRequest{cd, true} } } -func (cd *chunkDesc) getRefCount() int { +func (cd *chunkDesc) refCount() int { cd.Lock() defer cd.Unlock() - return cd.refCount + return cd.rCnt } func (cd *chunkDesc) firstTime() clientmodel.Timestamp { cd.Lock() defer cd.Unlock() - if cd.chunk == nil { + if cd.c == nil { return cd.chunkFirstTime } - return cd.chunk.firstTime() + return cd.c.firstTime() } func (cd *chunkDesc) lastTime() clientmodel.Timestamp { cd.Lock() defer cd.Unlock() - if cd.chunk == nil { + if cd.c == nil { return cd.chunkLastTime } - return cd.chunk.lastTime() + return cd.c.newIterator().lastTimestamp() } func (cd *chunkDesc) isEvicted() bool { cd.Lock() defer cd.Unlock() - return cd.chunk == nil + return cd.c == nil } func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool { return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) } -func (cd *chunkDesc) getChunk() chunk { +func (cd *chunkDesc) chunk() chunk { cd.Lock() defer cd.Unlock() - return cd.chunk + return cd.c } func (cd *chunkDesc) setChunk(c chunk) { cd.Lock() defer cd.Unlock() - if cd.chunk != nil { + if cd.c != nil { panic("chunk already set") } - cd.chunk = c + cd.c = c } // maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk @@ -162,15 +162,15 @@ func (cd *chunkDesc) maybeEvict() bool { cd.Lock() defer cd.Unlock() - if cd.chunk == nil { + if cd.c == nil { return true } - if cd.refCount != 0 { + if cd.rCnt != 0 { return false } - cd.chunkFirstTime = cd.chunk.firstTime() - cd.chunkLastTime = cd.chunk.lastTime() - cd.chunk = nil + cd.chunkFirstTime = cd.c.firstTime() + cd.chunkLastTime = cd.c.newIterator().lastTimestamp() + cd.c = nil chunkOps.WithLabelValues(evict).Inc() atomic.AddInt64(&numMemChunks, -1) return true @@ -188,12 +188,38 @@ type chunk interface { add(sample *metric.SamplePair) []chunk clone() chunk firstTime() clientmodel.Timestamp - lastTime() clientmodel.Timestamp newIterator() chunkIterator marshal(io.Writer) error unmarshal(io.Reader) error unmarshalFromBuf([]byte) encoding() chunkEncoding +} + +// A chunkIterator enables efficient access to the content of a chunk. It is +// generally not safe to use a chunkIterator concurrently with or after chunk +// mutation. +type chunkIterator interface { + // length returns the number of samples in the chunk. + length() int + // Gets the timestamp of the n-th sample in the chunk. + timestampAtIndex(int) clientmodel.Timestamp + // Gets the last timestamp in the chunk. + lastTimestamp() clientmodel.Timestamp + // Gets the sample value of the n-th sample in the chunk. + sampleValueAtIndex(int) clientmodel.SampleValue + // Gets the last sample value in the chunk. + lastSampleValue() clientmodel.SampleValue + // Gets the two values that are immediately adjacent to a given time. In + // case a value exist at precisely the given time, only that single + // value is returned. Only the first or last value is returned (as a + // single value), if the given time is before or after the first or last + // value, respectively. + valueAtTime(clientmodel.Timestamp) metric.Values + // Gets all values contained within a given interval. + rangeValues(metric.Interval) metric.Values + // Whether a given timestamp is contained between first and last value + // in the chunk. + contains(clientmodel.Timestamp) bool // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last // one. It is generally not safe to mutate the chunk while the channel @@ -201,29 +227,12 @@ type chunk interface { values() <-chan *metric.SamplePair } -// A chunkIterator enables efficient access to the content of a chunk. It is -// generally not safe to use a chunkIterator concurrently with or after chunk -// mutation. -type chunkIterator interface { - // Gets the two values that are immediately adjacent to a given time. In - // case a value exist at precisely the given time, only that single - // value is returned. Only the first or last value is returned (as a - // single value), if the given time is before or after the first or last - // value, respectively. - getValueAtTime(clientmodel.Timestamp) metric.Values - // Gets all values contained within a given interval. - getRangeValues(metric.Interval) metric.Values - // Whether a given timestamp is contained between first and last value - // in the chunk. - contains(clientmodel.Timestamp) bool -} - func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { chunkOps.WithLabelValues(transcode).Inc() head := dst body := []chunk{} - for v := range src.values() { + for v := range src.newIterator().values() { newChunks := head.add(v) body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 25f656666..d52bdfcb0 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -318,7 +318,7 @@ func (p *persistence) sanitizeSeries( return fp, true } // This series is supposed to be archived. - metric, err := p.getArchivedMetric(fp) + metric, err := p.archivedMetric(fp) if err != nil { log.Errorf( "Fingerprint %v assumed archived but couldn't be looked up in archived index: %s", diff --git a/storage/local/delta.go b/storage/local/delta.go index 1be169feb..9e04890fe 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -188,18 +188,19 @@ func (c deltaEncodedChunk) clone() chunk { // firstTime implements chunk. func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp { - return c.valueAtIndex(0).Timestamp -} - -// lastTime implements chunk. -func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp + return c.baseTime() } // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { return &deltaEncodedChunkIterator{ - chunk: c, + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseV: c.baseValue(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), } } @@ -237,19 +238,6 @@ func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] } -// values implements chunk. -func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { - n := c.len() - valuesChan := make(chan *metric.SamplePair) - go func() { - for i := 0; i < n; i++ { - valuesChan <- c.valueAtIndex(i) - } - close(valuesChan) - }() - return valuesChan -} - // encoding implements chunk. func (c deltaEncodedChunk) encoding() chunkEncoding { return delta } @@ -284,106 +272,157 @@ func (c deltaEncodedChunk) len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } -func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { - offset := deltaHeaderBytes + idx*c.sampleSize() - - var ts clientmodel.Timestamp - switch c.timeBytes() { - case d1: - ts = c.baseTime() + clientmodel.Timestamp(uint8(c[offset])) - case d2: - ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c[offset:])) - case d4: - ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c[offset:])) - case d8: - // Take absolute value for d8. - ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:])) - default: - panic("Invalid number of bytes for time delta") - } - - offset += int(c.timeBytes()) - - var v clientmodel.SampleValue - if c.isInt() { - switch c.valueBytes() { - case d0: - v = c.baseValue() - case d1: - v = c.baseValue() + clientmodel.SampleValue(int8(c[offset])) - case d2: - v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:]))) - case d4: - v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:]))) - // No d8 for ints. - default: - panic("Invalid number of bytes for integer delta") - } - } else { - switch c.valueBytes() { - case d4: - v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:]))) - case d8: - // Take absolute value for d8. - v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:]))) - default: - panic("Invalid number of bytes for floating point delta") - } - } - return &metric.SamplePair{ - Timestamp: ts, - Value: v, - } -} - // deltaEncodedChunkIterator implements chunkIterator. type deltaEncodedChunkIterator struct { - chunk *deltaEncodedChunk - // TODO: add more fields here to keep track of last position. + c deltaEncodedChunk + len int + baseT clientmodel.Timestamp + baseV clientmodel.SampleValue + tBytes, vBytes deltaBytes + isInt bool } -// getValueAtTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { - i := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(t) +// length implements chunkIterator. +func (it *deltaEncodedChunkIterator) length() int { return it.len } + +// valueAtTime implements chunkIterator. +func (it *deltaEncodedChunkIterator) valueAtTime(t clientmodel.Timestamp) metric.Values { + i := sort.Search(it.len, func(i int) bool { + return !it.timestampAtIndex(i).Before(t) }) switch i { case 0: - return metric.Values{*it.chunk.valueAtIndex(0)} - case it.chunk.len(): - return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)} + return metric.Values{metric.SamplePair{ + Timestamp: it.timestampAtIndex(0), + Value: it.sampleValueAtIndex(0), + }} + case it.len: + return metric.Values{metric.SamplePair{ + Timestamp: it.timestampAtIndex(it.len - 1), + Value: it.sampleValueAtIndex(it.len - 1), + }} default: - v := it.chunk.valueAtIndex(i) - if v.Timestamp.Equal(t) { - return metric.Values{*v} + ts := it.timestampAtIndex(i) + if ts.Equal(t) { + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.sampleValueAtIndex(i), + }} + } + return metric.Values{ + metric.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), + }, + metric.SamplePair{ + Timestamp: ts, + Value: it.sampleValueAtIndex(i), + }, } - return metric.Values{*it.chunk.valueAtIndex(i - 1), *v} } } -// getRangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values { - oldest := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive) +// rangeValues implements chunkIterator. +func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) metric.Values { + oldest := sort.Search(it.len, func(i int) bool { + return !it.timestampAtIndex(i).Before(in.OldestInclusive) }) - newest := sort.Search(it.chunk.len(), func(i int) bool { - return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive) + newest := sort.Search(it.len, func(i int) bool { + return it.timestampAtIndex(i).After(in.NewestInclusive) }) - if oldest == it.chunk.len() { + if oldest == it.len { return nil } result := make(metric.Values, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, *it.chunk.valueAtIndex(i)) + result = append(result, metric.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), + }) } return result } // contains implements chunkIterator. func (it *deltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool { - return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime()) + return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +} + +// values implements chunkIterator. +func (it *deltaEncodedChunkIterator) values() <-chan *metric.SamplePair { + valuesChan := make(chan *metric.SamplePair) + go func() { + for i := 0; i < it.len; i++ { + valuesChan <- &metric.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), + } + } + close(valuesChan) + }() + return valuesChan +} + +// timestampAtIndex implements chunkIterator. +func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Timestamp { + offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + + switch it.tBytes { + case d1: + return it.baseT + clientmodel.Timestamp(uint8(it.c[offset])) + case d2: + return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint16(it.c[offset:])) + case d4: + return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint32(it.c[offset:])) + case d8: + // Take absolute value for d8. + return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) + default: + panic("Invalid number of bytes for time delta") + } +} + +// lastTimestamp implements chunkIterator. +func (it *deltaEncodedChunkIterator) lastTimestamp() clientmodel.Timestamp { + return it.timestampAtIndex(it.len - 1) +} + +// sampleValueAtIndex implements chunkIterator. +func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.SampleValue { + offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) + + if it.isInt { + switch it.vBytes { + case d0: + return it.baseV + case d1: + return it.baseV + clientmodel.SampleValue(int8(it.c[offset])) + case d2: + return it.baseV + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + case d4: + return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + // No d8 for ints. + default: + panic("Invalid number of bytes for integer delta") + } + } else { + switch it.vBytes { + case d4: + return it.baseV + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + case d8: + // Take absolute value for d8. + return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + default: + panic("Invalid number of bytes for floating point delta") + } + } +} + +// lastSampleValue implements chunkIterator. +func (it *deltaEncodedChunkIterator) lastSampleValue() clientmodel.SampleValue { + return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index dcfd155ed..17e2aa5f6 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -199,15 +199,18 @@ func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp { return c.baseTime() } -// lastTime implements chunk. -func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp -} - // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { return &doubleDeltaEncodedChunkIterator{ - chunk: c, + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseΔT: c.baseTimeDelta(), + baseV: c.baseValue(), + baseΔV: c.baseValueDelta(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), } } @@ -245,19 +248,6 @@ func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] } -// values implements chunk. -func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { - n := c.len() - valuesChan := make(chan *metric.SamplePair) - go func() { - for i := 0; i < n; i++ { - valuesChan <- c.valueAtIndex(i) - } - close(valuesChan) - }() - return valuesChan -} - // encoding implements chunk. func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta } @@ -280,6 +270,9 @@ func (c doubleDeltaEncodedChunk) baseValue() clientmodel.SampleValue { } func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp { + if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 { + return 0 + } return clientmodel.Timestamp( binary.LittleEndian.Uint64( c[doubleDeltaHeaderBaseTimeDeltaOffset:], @@ -288,6 +281,9 @@ func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp { } func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue { + if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 { + return 0 + } return clientmodel.SampleValue( math.Float64frombits( binary.LittleEndian.Uint64( @@ -387,147 +383,196 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb de return []chunk{&c} } -func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { - if idx == 0 { - return &metric.SamplePair{ - Timestamp: c.baseTime(), - Value: c.baseValue(), - } - } - if idx == 1 { - // If time and/or value bytes are at d8, the time and value is - // saved directly rather than as a difference. - timestamp := c.baseTimeDelta() - if c.timeBytes() < d8 { - timestamp += c.baseTime() - } - value := c.baseValueDelta() - if c.valueBytes() < d8 { - value += c.baseValue() - } - return &metric.SamplePair{ - Timestamp: timestamp, - Value: value, - } - } - offset := doubleDeltaHeaderBytes + (idx-2)*c.sampleSize() - - var ts clientmodel.Timestamp - switch c.timeBytes() { - case d1: - ts = c.baseTime() + - clientmodel.Timestamp(idx)*c.baseTimeDelta() + - clientmodel.Timestamp(int8(c[offset])) - case d2: - ts = c.baseTime() + - clientmodel.Timestamp(idx)*c.baseTimeDelta() + - clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(c[offset:]))) - case d4: - ts = c.baseTime() + - clientmodel.Timestamp(idx)*c.baseTimeDelta() + - clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(c[offset:]))) - case d8: - // Take absolute value for d8. - ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:])) - default: - panic("Invalid number of bytes for time delta") - } - - offset += int(c.timeBytes()) - - var v clientmodel.SampleValue - if c.isInt() { - switch c.valueBytes() { - case d0: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() - case d1: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(int8(c[offset])) - case d2: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:]))) - case d4: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:]))) - // No d8 for ints. - default: - panic("Invalid number of bytes for integer delta") - } - } else { - switch c.valueBytes() { - case d4: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:]))) - case d8: - // Take absolute value for d8. - v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:]))) - default: - panic("Invalid number of bytes for floating point delta") - } - } - return &metric.SamplePair{ - Timestamp: ts, - Value: v, - } -} - // doubleDeltaEncodedChunkIterator implements chunkIterator. type doubleDeltaEncodedChunkIterator struct { - chunk *doubleDeltaEncodedChunk - // TODO(beorn7): add more fields here to keep track of last position. + c doubleDeltaEncodedChunk + len int + baseT, baseΔT clientmodel.Timestamp + baseV, baseΔV clientmodel.SampleValue + tBytes, vBytes deltaBytes + isInt bool } -// getValueAtTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { - // TODO(beorn7): Implement in a more efficient way making use of the - // state of the iterator and internals of the doubleDeltaChunk. - i := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(t) +// length implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } + +// valueAtTime implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) valueAtTime(t clientmodel.Timestamp) metric.Values { + i := sort.Search(it.len, func(i int) bool { + return !it.timestampAtIndex(i).Before(t) }) switch i { case 0: - return metric.Values{*it.chunk.valueAtIndex(0)} - case it.chunk.len(): - return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)} + return metric.Values{metric.SamplePair{ + Timestamp: it.timestampAtIndex(0), + Value: it.sampleValueAtIndex(0), + }} + case it.len: + return metric.Values{metric.SamplePair{ + Timestamp: it.timestampAtIndex(it.len - 1), + Value: it.sampleValueAtIndex(it.len - 1), + }} default: - v := it.chunk.valueAtIndex(i) - if v.Timestamp.Equal(t) { - return metric.Values{*v} + ts := it.timestampAtIndex(i) + if ts.Equal(t) { + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.sampleValueAtIndex(i), + }} + } + return metric.Values{ + metric.SamplePair{ + Timestamp: it.timestampAtIndex(i - 1), + Value: it.sampleValueAtIndex(i - 1), + }, + metric.SamplePair{ + Timestamp: ts, + Value: it.sampleValueAtIndex(i), + }, } - return metric.Values{*it.chunk.valueAtIndex(i - 1), *v} } } -// getRangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values { - // TODO(beorn7): Implement in a more efficient way making use of the - // state of the iterator and internals of the doubleDeltaChunk. - oldest := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive) +// rangeValues implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) metric.Values { + oldest := sort.Search(it.len, func(i int) bool { + return !it.timestampAtIndex(i).Before(in.OldestInclusive) }) - newest := sort.Search(it.chunk.len(), func(i int) bool { - return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive) + newest := sort.Search(it.len, func(i int) bool { + return it.timestampAtIndex(i).After(in.NewestInclusive) }) - if oldest == it.chunk.len() { + if oldest == it.len { return nil } result := make(metric.Values, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, *it.chunk.valueAtIndex(i)) + result = append(result, metric.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), + }) } return result } // contains implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool { - return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime()) + return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +} + +// values implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) values() <-chan *metric.SamplePair { + valuesChan := make(chan *metric.SamplePair) + go func() { + for i := 0; i < it.len; i++ { + valuesChan <- &metric.SamplePair{ + Timestamp: it.timestampAtIndex(i), + Value: it.sampleValueAtIndex(i), + } + } + close(valuesChan) + }() + return valuesChan +} + +// timestampAtIndex implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Timestamp { + if idx == 0 { + return it.baseT + } + if idx == 1 { + // If time bytes are at d8, the time is saved directly rather + // than as a difference. + if it.tBytes == d8 { + return it.baseΔT + } + return it.baseT + it.baseΔT + } + + offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + + switch it.tBytes { + case d1: + return it.baseT + + clientmodel.Timestamp(idx)*it.baseΔT + + clientmodel.Timestamp(int8(it.c[offset])) + case d2: + return it.baseT + + clientmodel.Timestamp(idx)*it.baseΔT + + clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + case d4: + return it.baseT + + clientmodel.Timestamp(idx)*it.baseΔT + + clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + case d8: + // Take absolute value for d8. + return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) + default: + panic("Invalid number of bytes for time delta") + } +} + +// lastTimestamp implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() clientmodel.Timestamp { + return it.timestampAtIndex(it.len - 1) +} + +// sampleValueAtIndex implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.SampleValue { + if idx == 0 { + return it.baseV + } + if idx == 1 { + // If value bytes are at d8, the value is saved directly rather + // than as a difference. + if it.vBytes == d8 { + return it.baseΔV + } + return it.baseV + it.baseΔV + } + + offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) + + if it.isInt { + switch it.vBytes { + case d0: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + case d1: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(int8(it.c[offset])) + case d2: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + case d4: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + // No d8 for ints. + default: + panic("Invalid number of bytes for integer delta") + } + } else { + switch it.vBytes { + case d4: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + case d8: + // Take absolute value for d8. + return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + default: + panic("Invalid number of bytes for floating point delta") + } + } +} + +// lastSampleValue implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() clientmodel.SampleValue { + return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/interface.go b/storage/local/interface.go index 60df2d2f7..54885b0e2 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -38,11 +38,11 @@ type Storage interface { NewPreloader() Preloader // Get all of the metric fingerprints that are associated with the // provided label matchers. - GetFingerprintsForLabelMatchers(metric.LabelMatchers) clientmodel.Fingerprints + FingerprintsForLabelMatchers(metric.LabelMatchers) clientmodel.Fingerprints // Get all of the label values that are associated with a given label name. - GetLabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues + LabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues // Get the metric associated with the provided fingerprint. - GetMetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric + MetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric // Construct an iterator for a given fingerprint. NewIterator(clientmodel.Fingerprint) SeriesIterator // Run the various maintenance loops in goroutines. Returns when the @@ -53,28 +53,28 @@ type Storage interface { // operations, stops all maintenance loops,and frees all resources. Stop() error // WaitForIndexing returns once all samples in the storage are - // indexed. Indexing is needed for GetFingerprintsForLabelMatchers and - // GetLabelValuesForLabelName and may lag behind. + // indexed. Indexing is needed for FingerprintsForLabelMatchers and + // LabelValuesForLabelName and may lag behind. WaitForIndexing() } -// SeriesIterator enables efficient access of sample values in a series. All -// methods are goroutine-safe. A SeriesIterator iterates over a snapshot of a -// series, i.e. it is safe to continue using a SeriesIterator after modifying -// the corresponding series, but the iterator will represent the state of the -// series prior the modification. +// SeriesIterator enables efficient access of sample values in a series. Its +// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of +// a series, i.e. it is safe to continue using a SeriesIterator after or during +// modifying the corresponding series, but the iterator will represent the state +// of the series prior the modification. type SeriesIterator interface { // Gets the two values that are immediately adjacent to a given time. In // case a value exist at precisely the given time, only that single // value is returned. Only the first or last value is returned (as a // single value), if the given time is before or after the first or last // value, respectively. - GetValueAtTime(clientmodel.Timestamp) metric.Values + ValueAtTime(clientmodel.Timestamp) metric.Values // Gets the boundary values of an interval: the first and last value // within a given interval. - GetBoundaryValues(metric.Interval) metric.Values + BoundaryValues(metric.Interval) metric.Values // Gets all values contained within a given interval. - GetRangeValues(metric.Interval) metric.Values + RangeValues(metric.Interval) metric.Values } // A Preloader preloads series data necessary for a query into memory and pins diff --git a/storage/local/mapper.go b/storage/local/mapper.go index 5fcb5cbfe..0dca894fa 100644 --- a/storage/local/mapper.go +++ b/storage/local/mapper.go @@ -93,7 +93,7 @@ func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clie // If we are here, FP does not exist in memory and is either not mapped // at all, or existing mappings for FP are not for m. Check if we have // something for FP in the archive. - archivedMetric, err := r.p.getArchivedMetric(fp) + archivedMetric, err := r.p.archivedMetric(fp) if err != nil { return fp, err } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index b4828a8b8..c2fc3d034 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -319,11 +319,11 @@ func (p *persistence) setDirty(dirty bool) { } } -// getFingerprintsForLabelPair returns the fingerprints for the given label +// fingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { +func (p *persistence) fingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { return nil, err @@ -331,11 +331,11 @@ func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmo return fps, nil } -// getLabelValuesForLabelName returns the label values for the given label +// labelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { +func (p *persistence) labelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { return nil, err @@ -632,10 +632,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } } else { // This is the non-persisted head chunk. Fully marshal it. - if err = w.WriteByte(byte(chunkDesc.chunk.encoding())); err != nil { + if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil { return } - if err = chunkDesc.chunk.marshal(w); err != nil { + if err = chunkDesc.c.marshal(w); err != nil { return } } @@ -880,7 +880,7 @@ func (p *persistence) dropAndPersistChunks( // too old. If that's the case, the chunks in the series file // are all too old, too. i := 0 - for ; i < len(chunks) && chunks[i].lastTime().Before(beforeTime); i++ { + for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ { } if i < len(chunks) { firstTimeNotDropped = chunks[i].firstTime() @@ -1017,10 +1017,10 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error) return numChunks, nil } -// getSeriesFileModTime returns the modification time of the series file -// belonging to the provided fingerprint. In case of an error, the zero value of -// time.Time is returned. -func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time { +// seriesFileModTime returns the modification time of the series file belonging +// to the provided fingerprint. In case of an error, the zero value of time.Time +// is returned. +func (p *persistence) seriesFileModTime(fp clientmodel.Fingerprint) time.Time { var modTime time.Time if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil { return fi.ModTime() @@ -1029,17 +1029,17 @@ func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time } // indexMetric queues the given metric for addition to the indexes needed by -// getFingerprintsForLabelPair, getLabelValuesForLabelName, and -// getFingerprintsModifiedBefore. If the queue is full, this method blocks -// until the metric can be queued. This method is goroutine-safe. +// fingerprintsForLabelPair, labelValuesForLabelName, and +// fingerprintsModifiedBefore. If the queue is full, this method blocks until +// the metric can be queued. This method is goroutine-safe. func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) { p.indexingQueue <- indexingOp{fp, m, add} } // unindexMetric queues references to the given metric for removal from the -// indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and -// getFingerprintsModifiedBefore. The index of fingerprints to archived metrics -// is not affected by this removal. (In fact, never call this method for an +// indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and +// fingerprintsModifiedBefore. The index of fingerprints to archived metrics is +// not affected by this removal. (In fact, never call this method for an // archived metric. To purge an archived metric, call purgeArchivedFingerprint.) // If the queue is full, this method blocks until the metric can be queued. This // method is goroutine-safe. @@ -1097,10 +1097,10 @@ func (p *persistence) updateArchivedTimeRange( return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}) } -// getFingerprintsModifiedBefore returns the fingerprints of archived timeseries +// fingerprintsModifiedBefore returns the fingerprints of archived timeseries // that have live samples before the provided timestamp. This method is // goroutine-safe. -func (p *persistence) getFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { +func (p *persistence) fingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { var fp codable.Fingerprint var tr codable.TimeRange fps := []clientmodel.Fingerprint{} @@ -1119,9 +1119,9 @@ func (p *persistence) getFingerprintsModifiedBefore(beforeTime clientmodel.Times return fps, nil } -// getArchivedMetric retrieves the archived metric with the given -// fingerprint. This method is goroutine-safe. -func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { +// archivedMetric retrieves the archived metric with the given fingerprint. This +// method is goroutine-safe. +func (p *persistence) archivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) return metric, err } @@ -1137,7 +1137,7 @@ func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error } }() - metric, err := p.getArchivedMetric(fp) + metric, err := p.archivedMetric(fp) if err != nil || metric == nil { return err } @@ -1567,8 +1567,14 @@ func chunkIndexForOffset(offset int64) (int, error) { func writeChunkHeader(w io.Writer, c chunk) error { header := make([]byte, chunkHeaderLen) header[chunkHeaderTypeOffset] = byte(c.encoding()) - binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime())) - binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime())) + binary.LittleEndian.PutUint64( + header[chunkHeaderFirstTimeOffset:], + uint64(c.firstTime()), + ) + binary.LittleEndian.PutUint64( + header[chunkHeaderLastTimeOffset:], + uint64(c.newIterator().lastTimestamp()), + ) _, err := w.Write(header) return err } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 00e36b792..1bf8b24b7 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -70,8 +70,8 @@ func buildTestChunks(encoding chunkEncoding) map[clientmodel.Fingerprint][]chunk } func chunksEqual(c1, c2 chunk) bool { - values2 := c2.values() - for v1 := range c1.values() { + values2 := c2.newIterator().values() + for v1 := range c1.newIterator().values() { v2 := <-values2 if !v1.Equal(v2) { return false @@ -397,7 +397,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if !reflect.DeepEqual(loadedS1.metric, m1) { t.Errorf("want metric %v, got %v", m1, loadedS1.metric) } - if !reflect.DeepEqual(loadedS1.head().chunk, s1.head().chunk) { + if !reflect.DeepEqual(loadedS1.head().c, s1.head().c) { t.Error("head chunks differ") } if loadedS1.chunkDescsOffset != 0 { @@ -413,7 +413,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding if !reflect.DeepEqual(loadedS3.metric, m3) { t.Errorf("want metric %v, got %v", m3, loadedS3.metric) } - if loadedS3.head().chunk != nil { + if loadedS3.head().c != nil { t.Error("head chunk not evicted") } if loadedS3.chunkDescsOffset != -1 { @@ -515,7 +515,7 @@ func TestCheckpointAndLoadFPMappings(t *testing.T) { } } -func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { +func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -537,7 +537,7 @@ func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { } for ts, want := range expectedFPs { - got, err := p.getFingerprintsModifiedBefore(ts) + got, err := p.fingerprintsModifiedBefore(ts) if err != nil { t.Fatal(err) } @@ -575,7 +575,7 @@ func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { } for ts, want := range expectedFPs { - got, err := p.getFingerprintsModifiedBefore(ts) + got, err := p.fingerprintsModifiedBefore(ts) if err != nil { t.Fatal(err) } @@ -585,12 +585,12 @@ func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { } } -func TestGetFingerprintsModifiedBeforeChunkType0(t *testing.T) { - testGetFingerprintsModifiedBefore(t, 0) +func TestFingerprintsModifiedBeforeChunkType0(t *testing.T) { + testFingerprintsModifiedBefore(t, 0) } -func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) { - testGetFingerprintsModifiedBefore(t, 1) +func TestFingerprintsModifiedBeforeChunkType1(t *testing.T) { + testFingerprintsModifiedBefore(t, 1) } func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { @@ -605,7 +605,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { p.indexMetric(2, m2) p.waitForIndexing() - outFPs, err := p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"}) + outFPs, err := p.fingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"}) if err != nil { t.Fatal(err) } @@ -613,7 +613,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"}) + outFPs, err = p.fingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"}) if err != nil { t.Fatal(err) } @@ -637,7 +637,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { } p.waitForIndexing() - outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"}) + outFPs, err = p.fingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"}) if err != nil { t.Fatal(err) } @@ -645,7 +645,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { if !reflect.DeepEqual(outFPs, want) { t.Errorf("want %#v, got %#v", want, outFPs) } - outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"}) + outFPs, err = p.fingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"}) if err != nil { t.Fatal(err) } @@ -858,7 +858,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet p.waitForIndexing() for fp, m := range indexedFpsToMetrics { // Compare archived metrics with input metrics. - mOut, err := p.getArchivedMetric(fp) + mOut, err := p.archivedMetric(fp) if err != nil { t.Fatal(err) } @@ -884,7 +884,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs, err := p.getLabelValuesForLabelName(ln) + outLvs, err := p.labelValuesForLabelName(ln) if err != nil { t.Fatal(err) } @@ -901,7 +901,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFPs, err := p.getFingerprintsForLabelPair(lp) + outFPs, err := p.fingerprintsForLabelPair(lp) if err != nil { t.Fatal(err) } diff --git a/storage/local/preload.go b/storage/local/preload.go index 077cccca0..4bccc0c42 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -40,8 +40,8 @@ func (p *memorySeriesPreloader) PreloadRange( } /* -// GetMetricAtTime implements Preloader. -func (p *memorySeriesPreloader) GetMetricAtTime(fp clientmodel.Fingerprint, t clientmodel.Timestamp) error { +// MetricAtTime implements Preloader. +func (p *memorySeriesPreloader) MetricAtTime(fp clientmodel.Fingerprint, t clientmodel.Timestamp) error { cds, err := p.storage.preloadChunks(fp, &timeSelector{ from: t, through: t, @@ -53,8 +53,8 @@ func (p *memorySeriesPreloader) GetMetricAtTime(fp clientmodel.Fingerprint, t cl return nil } -// GetMetricAtInterval implements Preloader. -func (p *memorySeriesPreloader) GetMetricAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) error { +// MetricAtInterval implements Preloader. +func (p *memorySeriesPreloader) MetricAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) error { cds, err := p.storage.preloadChunks(fp, &timeSelector{ from: from, through: through, @@ -67,8 +67,8 @@ func (p *memorySeriesPreloader) GetMetricAtInterval(fp clientmodel.Fingerprint, return } -// GetMetricRange implements Preloader. -func (p *memorySeriesPreloader) GetMetricRange(fp clientmodel.Fingerprint, t clientmodel.Timestamp, rangeDuration time.Duration) error { +// MetricRange implements Preloader. +func (p *memorySeriesPreloader) MetricRange(fp clientmodel.Fingerprint, t clientmodel.Timestamp, rangeDuration time.Duration) error { cds, err := p.storage.preloadChunks(fp, &timeSelector{ from: t, through: t, @@ -81,8 +81,8 @@ func (p *memorySeriesPreloader) GetMetricRange(fp clientmodel.Fingerprint, t cli return } -// GetMetricRangeAtInterval implements Preloader. -func (p *memorySeriesPreloader) GetMetricRangeAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) error { +// MetricRangeAtInterval implements Preloader. +func (p *memorySeriesPreloader) MetricRangeAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) error { cds, err := p.storage.preloadChunks(fp, &timeSelector{ from: from, through: through, diff --git a/storage/local/series.go b/storage/local/series.go index 419ae78bc..60c428441 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -208,7 +208,7 @@ func (s *memorySeries) add(v *metric.SamplePair) int { newHead := newChunkDesc(newChunk()) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkClosed = false - } else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 { + } else if s.headChunkUsedByIterator && s.head().refCount() > 1 { // We only need to clone the head chunk if the current head // chunk was used in an iterator at all and if the refCount is // still greater than the 1 we always have because the head @@ -221,12 +221,12 @@ func (s *memorySeries) add(v *metric.SamplePair) int { chunkOps.WithLabelValues(clone).Inc() // No locking needed here because a non-persisted head chunk can // not get evicted concurrently. - s.head().chunk = s.head().chunk.clone() + s.head().c = s.head().c.clone() s.headChunkUsedByIterator = false } chunks := s.head().add(v) - s.head().chunk = chunks[0] + s.head().c = chunks[0] for _, c := range chunks[1:] { s.chunkDescs = append(s.chunkDescs, newChunkDesc(c)) @@ -415,10 +415,10 @@ func (s *memorySeries) preloadChunksForRange( // newIterator returns a new SeriesIterator. The caller must have locked the // fingerprint of the memorySeries. -func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { +func (s *memorySeries) newIterator() SeriesIterator { chunks := make([]chunk, 0, len(s.chunkDescs)) for i, cd := range s.chunkDescs { - if chunk := cd.getChunk(); chunk != nil { + if chunk := cd.chunk(); chunk != nil { if i == len(s.chunkDescs)-1 && !s.headChunkClosed { s.headChunkUsedByIterator = true } @@ -427,9 +427,8 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { } return &memorySeriesIterator{ - lock: lockFunc, - unlock: unlockFunc, - chunks: chunks, + chunks: chunks, + chunkIts: make([]chunkIterator, len(chunks)), } } @@ -449,13 +448,13 @@ func (s *memorySeries) firstTime() clientmodel.Timestamp { return s.savedFirstTime } -// getChunksToPersist returns a slice of chunkDescs eligible for -// persistence. It's the caller's responsibility to actually persist the -// returned chunks afterwards. The method sets the persistWatermark and the -// dirty flag accordingly. +// chunksToPersist returns a slice of chunkDescs eligible for persistence. It's +// the caller's responsibility to actually persist the returned chunks +// afterwards. The method sets the persistWatermark and the dirty flag +// accordingly. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) getChunksToPersist() []*chunkDesc { +func (s *memorySeries) chunksToPersist() []*chunkDesc { newWatermark := len(s.chunkDescs) if !s.headChunkClosed { newWatermark-- @@ -471,108 +470,124 @@ func (s *memorySeries) getChunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - lock, unlock func() - chunkIt chunkIterator - chunks []chunk + chunkIt chunkIterator // Last chunkIterator used by ValueAtTime. + chunkIts []chunkIterator // Caches chunkIterators. + chunks []chunk } -// GetValueAtTime implements SeriesIterator. -func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values { - it.lock() - defer it.unlock() - +// ValueAtTime implements SeriesIterator. +func (it *memorySeriesIterator) ValueAtTime(t clientmodel.Timestamp) metric.Values { // The most common case. We are iterating through a chunk. if it.chunkIt != nil && it.chunkIt.contains(t) { - return it.chunkIt.getValueAtTime(t) + return it.chunkIt.valueAtTime(t) } - it.chunkIt = nil - if len(it.chunks) == 0 { return nil } // Before or exactly on the first sample of the series. - if !t.After(it.chunks[0].firstTime()) { + it.chunkIt = it.chunkIterator(0) + ts := it.chunkIt.timestampAtIndex(0) + if !t.After(ts) { // return first value of first chunk - return it.chunks[0].newIterator().getValueAtTime(t) - } - // After or exactly on the last sample of the series. - if !t.Before(it.chunks[len(it.chunks)-1].lastTime()) { - // return last value of last chunk - return it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(t) + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.chunkIt.sampleValueAtIndex(0), + }} } - // Find first chunk where lastTime() is after or equal to t. + // After or exactly on the last sample of the series. + it.chunkIt = it.chunkIterator(len(it.chunks) - 1) + ts = it.chunkIt.lastTimestamp() + if !t.Before(ts) { + // return last value of last chunk + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1), + }} + } + + // Find last chunk where firstTime() is before or equal to t. + l := len(it.chunks) - 1 i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].lastTime().Before(t) + return !it.chunks[l-i].firstTime().After(t) }) if i == len(it.chunks) { panic("out of bounds") } - - if t.Before(it.chunks[i].firstTime()) { + it.chunkIt = it.chunkIterator(l - i) + ts = it.chunkIt.lastTimestamp() + if t.After(ts) { // We ended up between two chunks. + sp1 := metric.SamplePair{ + Timestamp: ts, + Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1), + } + it.chunkIt = it.chunkIterator(l - i + 1) return metric.Values{ - it.chunks[i-1].newIterator().getValueAtTime(t)[0], - it.chunks[i].newIterator().getValueAtTime(t)[0], + sp1, + metric.SamplePair{ + Timestamp: it.chunkIt.timestampAtIndex(0), + Value: it.chunkIt.sampleValueAtIndex(0), + }, } } - // We ended up in the middle of a chunk. We might stay there for a while, - // so save it as the current chunk iterator. - it.chunkIt = it.chunks[i].newIterator() - return it.chunkIt.getValueAtTime(t) + return it.chunkIt.valueAtTime(t) } -// GetBoundaryValues implements SeriesIterator. -func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values { - it.lock() - defer it.unlock() - - // Find the first relevant chunk. +// BoundaryValues implements SeriesIterator. +func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) metric.Values { + // Find the first chunk for which the first sample is within the interval. i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].lastTime().Before(in.OldestInclusive) + return !it.chunks[i].firstTime().Before(in.OldestInclusive) }) + // Only now check the last timestamp of the previous chunk (which is + // fairly expensive). + if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { + i-- + } + values := make(metric.Values, 0, 2) - for i, c := range it.chunks[i:] { - var chunkIt chunkIterator + for j, c := range it.chunks[i:] { if c.firstTime().After(in.NewestInclusive) { if len(values) == 1 { - // We found the first value before, but are now + // We found the first value before but are now // already past the last value. The value we // want must be the last value of the previous // chunk. So backtrack... - chunkIt = it.chunks[i-1].newIterator() - values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0]) + chunkIt := it.chunkIterator(i + j - 1) + values = append(values, metric.SamplePair{ + Timestamp: chunkIt.lastTimestamp(), + Value: chunkIt.lastSampleValue(), + }) } break } + chunkIt := it.chunkIterator(i + j) if len(values) == 0 { - chunkIt = c.newIterator() - firstValues := chunkIt.getValueAtTime(in.OldestInclusive) + firstValues := chunkIt.valueAtTime(in.OldestInclusive) switch len(firstValues) { case 2: values = append(values, firstValues[1]) case 1: values = firstValues default: - panic("unexpected return from getValueAtTime") + panic("unexpected return from valueAtTime") } } - if c.lastTime().After(in.NewestInclusive) { - if chunkIt == nil { - chunkIt = c.newIterator() - } - values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0]) + if chunkIt.lastTimestamp().After(in.NewestInclusive) { + values = append(values, chunkIt.valueAtTime(in.NewestInclusive)[0]) break } } if len(values) == 1 { // We found exactly one value. In that case, add the most recent we know. - values = append( - values, - it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(in.NewestInclusive)[0], - ) + chunkIt := it.chunkIterator(len(it.chunks) - 1) + values = append(values, metric.SamplePair{ + Timestamp: chunkIt.lastTimestamp(), + Value: chunkIt.lastSampleValue(), + }) } if len(values) == 2 && values[0].Equal(&values[1]) { return values[:1] @@ -580,41 +595,53 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val return values } -// GetRangeValues implements SeriesIterator. -func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values { - it.lock() - defer it.unlock() - - // Find the first relevant chunk. +// RangeValues implements SeriesIterator. +func (it *memorySeriesIterator) RangeValues(in metric.Interval) metric.Values { + // Find the first chunk for which the first sample is within the interval. i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].lastTime().Before(in.OldestInclusive) + return !it.chunks[i].firstTime().Before(in.OldestInclusive) }) + // Only now check the last timestamp of the previous chunk (which is + // fairly expensive). + if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { + i-- + } + values := metric.Values{} - for _, c := range it.chunks[i:] { + for j, c := range it.chunks[i:] { if c.firstTime().After(in.NewestInclusive) { break } - // TODO: actually reuse an iterator between calls if we get multiple ranges - // from the same chunk. - values = append(values, c.newIterator().getRangeValues(in)...) + values = append(values, it.chunkIterator(i+j).rangeValues(in)...) } return values } +// chunkIterator returns the chunkIterator for the chunk at position i (and +// creates it if needed). +func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { + chunkIt := it.chunkIts[i] + if chunkIt == nil { + chunkIt = it.chunks[i].newIterator() + it.chunkIts[i] = chunkIt + } + return chunkIt +} + // nopSeriesIterator implements Series Iterator. It never returns any values. type nopSeriesIterator struct{} -// GetValueAtTime implements SeriesIterator. -func (_ nopSeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values { +// ValueAtTime implements SeriesIterator. +func (_ nopSeriesIterator) ValueAtTime(t clientmodel.Timestamp) metric.Values { return metric.Values{} } -// GetBoundaryValues implements SeriesIterator. -func (_ nopSeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values { +// BoundaryValues implements SeriesIterator. +func (_ nopSeriesIterator) BoundaryValues(in metric.Interval) metric.Values { return metric.Values{} } -// GetRangeValues implements SeriesIterator. -func (_ nopSeriesIterator) GetRangeValues(in metric.Interval) metric.Values { +// RangeValues implements SeriesIterator. +func (_ nopSeriesIterator) RangeValues(in metric.Interval) metric.Values { return metric.Values{} } diff --git a/storage/local/storage.go b/storage/local/storage.go index a57f2e84e..e4fda7789 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -278,10 +278,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter // return any values. return nopSeriesIterator{} } - return series.newIterator( - func() { s.fpLocker.Lock(fp) }, - func() { s.fpLocker.Unlock(fp) }, - ) + return series.newIterator() } // NewPreloader implements Storage. @@ -291,14 +288,14 @@ func (s *memorySeriesStorage) NewPreloader() Preloader { } } -// GetFingerprintsForLabelMatchers implements Storage. -func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints { +// FingerprintsForLabelMatchers implements Storage. +func (s *memorySeriesStorage) FingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints { var result map[clientmodel.Fingerprint]struct{} for _, matcher := range labelMatchers { intersection := map[clientmodel.Fingerprint]struct{}{} switch matcher.Type { case metric.Equal: - fps, err := s.persistence.getFingerprintsForLabelPair( + fps, err := s.persistence.fingerprintsForLabelPair( metric.LabelPair{ Name: matcher.Name, Value: matcher.Value, @@ -316,7 +313,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr } } default: - values, err := s.persistence.getLabelValuesForLabelName(matcher.Name) + values, err := s.persistence.labelValuesForLabelName(matcher.Name) if err != nil { log.Errorf("Error getting label values for label name %q: %v", matcher.Name, err) } @@ -325,7 +322,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr return nil } for _, v := range matches { - fps, err := s.persistence.getFingerprintsForLabelPair( + fps, err := s.persistence.fingerprintsForLabelPair( metric.LabelPair{ Name: matcher.Name, Value: v, @@ -354,17 +351,17 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr return fps } -// GetLabelValuesForLabelName implements Storage. -func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues { - lvs, err := s.persistence.getLabelValuesForLabelName(labelName) +// LabelValuesForLabelName implements Storage. +func (s *memorySeriesStorage) LabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues { + lvs, err := s.persistence.labelValuesForLabelName(labelName) if err != nil { log.Errorf("Error getting label values for label name %q: %v", labelName, err) } return lvs } -// GetMetricForFingerprint implements Storage. -func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric { +// MetricForFingerprint implements Storage. +func (s *memorySeriesStorage) MetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -376,7 +373,7 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint Metric: series.metric, } } - metric, err := s.persistence.getArchivedMetric(fp) + metric, err := s.persistence.archivedMetric(fp) if err != nil { log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) } @@ -459,7 +456,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( return nil, nil } if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { - metric, err := s.persistence.getArchivedMetric(fp) + metric, err := s.persistence.archivedMetric(fp) if err != nil { return nil, err } @@ -648,7 +645,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode defer close(archivedFingerprints) for { - archivedFPs, err := s.persistence.getFingerprintsModifiedBefore( + archivedFPs, err := s.persistence.fingerprintsModifiedBefore( clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter), ) if err != nil { @@ -844,20 +841,21 @@ func (s *memorySeriesStorage) maintainMemorySeries( func (s *memorySeriesStorage) writeMemorySeries( fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp, ) bool { - cds := series.getChunksToPersist() + cds := series.chunksToPersist() defer func() { for _, cd := range cds { cd.unpin(s.evictRequests) } s.incNumChunksToPersist(-len(cds)) chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds))) - series.modTime = s.persistence.getSeriesFileModTime(fp) + series.modTime = s.persistence.seriesFileModTime(fp) }() // Get the actual chunks from underneath the chunkDescs. + // No lock required as chunks still to persist cannot be evicted. chunks := make([]chunk, len(cds)) for i, cd := range cds { - chunks[i] = cd.chunk + chunks[i] = cd.c } if !series.firstTime().Before(beforeTime) { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 479133915..8b43ae2a1 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -28,7 +28,7 @@ import ( "github.com/prometheus/prometheus/utility/test" ) -func TestGetFingerprintsForLabelMatchers(t *testing.T) { +func TestFingerprintsForLabelMatchers(t *testing.T) { storage, closer := NewTestStorage(t, 1) defer closer.Close() @@ -121,7 +121,7 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) { } for _, mt := range matcherTests { - resfps := storage.GetFingerprintsForLabelMatchers(mt.matchers) + resfps := storage.FingerprintsForLabelMatchers(mt.matchers) if len(mt.expected) != len(resfps) { t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(resfps)) } @@ -208,7 +208,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) { if cd.isEvicted() { continue } - for sample := range cd.chunk.values() { + for sample := range cd.c.newIterator().values() { values = append(values, *sample) } } @@ -234,8 +234,8 @@ func TestChunkType1(t *testing.T) { testChunk(t, 1) } -func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { - samples := make(clientmodel.Samples, 1000) +func testValueAtTime(t *testing.T, encoding chunkEncoding) { + samples := make(clientmodel.Samples, 10000) for i := range samples { samples[i] = &clientmodel.Sample{ Timestamp: clientmodel.Timestamp(2 * i), @@ -256,7 +256,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { // #1 Exactly on a sample. for i, expected := range samples { - actual := it.GetValueAtTime(expected.Timestamp) + actual := it.ValueAtTime(expected.Timestamp) if len(actual) != 1 { t.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) @@ -275,7 +275,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { continue } expected2 := samples[i+1] - actual := it.GetValueAtTime(expected1.Timestamp + 1) + actual := it.ValueAtTime(expected1.Timestamp + 1) if len(actual) != 2 { t.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) @@ -296,7 +296,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { // #3 Corner cases: Just before the first sample, just after the last. expected := samples[0] - actual := it.GetValueAtTime(expected.Timestamp - 1) + actual := it.ValueAtTime(expected.Timestamp - 1) if len(actual) != 1 { t.Fatalf("3.1. Expected exactly one result, got %d.", len(actual)) } @@ -307,7 +307,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { t.Errorf("3.1. Got %v; want %v", actual[0].Value, expected.Value) } expected = samples[len(samples)-1] - actual = it.GetValueAtTime(expected.Timestamp + 1) + actual = it.ValueAtTime(expected.Timestamp + 1) if len(actual) != 1 { t.Fatalf("3.2. Expected exactly one result, got %d.", len(actual)) } @@ -319,16 +319,89 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) { } } -func TestGetValueAtTimeChunkType0(t *testing.T) { - testGetValueAtTime(t, 0) +func TestValueAtTimeChunkType0(t *testing.T) { + testValueAtTime(t, 0) } -func TestGetValueAtTimeChunkType1(t *testing.T) { - testGetValueAtTime(t, 1) +func TestValueAtTimeChunkType1(t *testing.T) { + testValueAtTime(t, 1) } -func testGetRangeValues(t *testing.T, encoding chunkEncoding) { - samples := make(clientmodel.Samples, 1000) +func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) { + samples := make(clientmodel.Samples, 10000) + for i := range samples { + samples[i] = &clientmodel.Sample{ + Timestamp: clientmodel.Timestamp(2 * i), + Value: clientmodel.SampleValue(float64(i) * 0.2), + } + } + s, closer := NewTestStorage(b, encoding) + defer closer.Close() + + for _, sample := range samples { + s.Append(sample) + } + s.WaitForIndexing() + + fp := clientmodel.Metric{}.FastFingerprint() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + it := s.NewIterator(fp) + + // #1 Exactly on a sample. + for i, expected := range samples { + actual := it.ValueAtTime(expected.Timestamp) + + if len(actual) != 1 { + b.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) + } + if expected.Timestamp != actual[0].Timestamp { + b.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp) + } + if expected.Value != actual[0].Value { + b.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value) + } + } + + // #2 Between samples. + for i, expected1 := range samples { + if i == len(samples)-1 { + continue + } + expected2 := samples[i+1] + actual := it.ValueAtTime(expected1.Timestamp + 1) + + if len(actual) != 2 { + b.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) + } + if expected1.Timestamp != actual[0].Timestamp { + b.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp) + } + if expected1.Value != actual[0].Value { + b.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value) + } + if expected2.Timestamp != actual[1].Timestamp { + b.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp) + } + if expected2.Value != actual[1].Value { + b.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value) + } + } + } +} + +func BenchmarkValueAtTimeChunkType0(b *testing.B) { + benchmarkValueAtTime(b, 0) +} + +func BenchmarkValueAtTimeChunkType1(b *testing.B) { + benchmarkValueAtTime(b, 1) +} + +func testRangeValues(t *testing.T, encoding chunkEncoding) { + samples := make(clientmodel.Samples, 10000) for i := range samples { samples[i] = &clientmodel.Sample{ Timestamp: clientmodel.Timestamp(2 * i), @@ -349,7 +422,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { // #1 Zero length interval at sample. for i, expected := range samples { - actual := it.GetRangeValues(metric.Interval{ + actual := it.RangeValues(metric.Interval{ OldestInclusive: expected.Timestamp, NewestInclusive: expected.Timestamp, }) @@ -367,7 +440,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { // #2 Zero length interval off sample. for i, expected := range samples { - actual := it.GetRangeValues(metric.Interval{ + actual := it.RangeValues(metric.Interval{ OldestInclusive: expected.Timestamp + 1, NewestInclusive: expected.Timestamp + 1, }) @@ -379,7 +452,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { // #3 2sec interval around sample. for i, expected := range samples { - actual := it.GetRangeValues(metric.Interval{ + actual := it.RangeValues(metric.Interval{ OldestInclusive: expected.Timestamp - 1, NewestInclusive: expected.Timestamp + 1, }) @@ -401,7 +474,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { continue } expected2 := samples[i+1] - actual := it.GetRangeValues(metric.Interval{ + actual := it.RangeValues(metric.Interval{ OldestInclusive: expected1.Timestamp, NewestInclusive: expected1.Timestamp + 2, }) @@ -426,7 +499,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { // #5 corner cases: Interval ends at first sample, interval starts // at last sample, interval entirely before/after samples. expected := samples[0] - actual := it.GetRangeValues(metric.Interval{ + actual := it.RangeValues(metric.Interval{ OldestInclusive: expected.Timestamp - 2, NewestInclusive: expected.Timestamp, }) @@ -440,7 +513,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { t.Errorf("5.1. Got %v; want %v.", actual[0].Value, expected.Value) } expected = samples[len(samples)-1] - actual = it.GetRangeValues(metric.Interval{ + actual = it.RangeValues(metric.Interval{ OldestInclusive: expected.Timestamp, NewestInclusive: expected.Timestamp + 2, }) @@ -454,7 +527,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { t.Errorf("5.2. Got %v; want %v.", actual[0].Value, expected.Value) } firstSample := samples[0] - actual = it.GetRangeValues(metric.Interval{ + actual = it.RangeValues(metric.Interval{ OldestInclusive: firstSample.Timestamp - 4, NewestInclusive: firstSample.Timestamp - 2, }) @@ -462,7 +535,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { t.Fatalf("5.3. Expected no results, got %d.", len(actual)) } lastSample := samples[len(samples)-1] - actual = it.GetRangeValues(metric.Interval{ + actual = it.RangeValues(metric.Interval{ OldestInclusive: lastSample.Timestamp + 2, NewestInclusive: lastSample.Timestamp + 4, }) @@ -471,16 +544,61 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) { } } -func TestGetRangeValuesChunkType0(t *testing.T) { - testGetRangeValues(t, 0) +func TestRangeValuesChunkType0(t *testing.T) { + testRangeValues(t, 0) } -func TestGetRangeValuesChunkType1(t *testing.T) { - testGetRangeValues(t, 1) +func TestRangeValuesChunkType1(t *testing.T) { + testRangeValues(t, 1) +} + +func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { + samples := make(clientmodel.Samples, 10000) + for i := range samples { + samples[i] = &clientmodel.Sample{ + Timestamp: clientmodel.Timestamp(2 * i), + Value: clientmodel.SampleValue(float64(i) * 0.2), + } + } + s, closer := NewTestStorage(b, encoding) + defer closer.Close() + + for _, sample := range samples { + s.Append(sample) + } + s.WaitForIndexing() + + fp := clientmodel.Metric{}.FastFingerprint() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + + it := s.NewIterator(fp) + + for _, sample := range samples { + actual := it.RangeValues(metric.Interval{ + OldestInclusive: sample.Timestamp - 20, + NewestInclusive: sample.Timestamp + 20, + }) + + if len(actual) < 10 { + b.Fatalf("not enough samples found") + } + } + } +} + +func BenchmarkRangeValuesChunkType0(b *testing.B) { + benchmarkRangeValues(b, 0) +} + +func BenchmarkRangeValuesChunkType1(b *testing.B) { + benchmarkRangeValues(b, 1) } func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { - samples := make(clientmodel.Samples, 1000) + samples := make(clientmodel.Samples, 10000) for i := range samples { samples[i] = &clientmodel.Sample{ Timestamp: clientmodel.Timestamp(2 * i), @@ -498,29 +616,29 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { fp := clientmodel.Metric{}.FastFingerprint() // Drop ~half of the chunks. - s.maintainMemorySeries(fp, 1000) + s.maintainMemorySeries(fp, 10000) it := s.NewIterator(fp) - actual := it.GetBoundaryValues(metric.Interval{ + actual := it.BoundaryValues(metric.Interval{ OldestInclusive: 0, - NewestInclusive: 10000, + NewestInclusive: 100000, }) if len(actual) != 2 { t.Fatal("expected two results after purging half of series") } - if actual[0].Timestamp < 600 || actual[0].Timestamp > 1000 { + if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 { t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp) } - want := clientmodel.Timestamp(1998) + want := clientmodel.Timestamp(19998) if actual[1].Timestamp != want { t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) } // Drop everything. - s.maintainMemorySeries(fp, 10000) + s.maintainMemorySeries(fp, 100000) it = s.NewIterator(fp) - actual = it.GetBoundaryValues(metric.Interval{ + actual = it.BoundaryValues(metric.Interval{ OldestInclusive: 0, - NewestInclusive: 10000, + NewestInclusive: 100000, }) if len(actual) != 0 { t.Fatal("expected zero results after purging the whole series") @@ -558,7 +676,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } // Drop ~half of the chunks of an archived series. - s.maintainArchivedSeries(fp, 1000) + s.maintainArchivedSeries(fp, 10000) archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) @@ -568,7 +686,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } // Drop everything. - s.maintainArchivedSeries(fp, 10000) + s.maintainArchivedSeries(fp, 100000) archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) @@ -625,7 +743,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // This will archive again, but must not drop it completely, despite the // memorySeries being empty. - s.maintainMemorySeries(fp, 1000) + s.maintainMemorySeries(fp, 10000) archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) @@ -685,7 +803,7 @@ func testFuzz(t *testing.T, encoding chunkEncoding) { s, c := NewTestStorage(t, encoding) defer c.Close() - samples := createRandomSamples("test_fuzz", 1000) + samples := createRandomSamples("test_fuzz", 10000) for _, sample := range samples { s.Append(sample) } @@ -917,7 +1035,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Sam } p := s.NewPreloader() p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour) - found := s.NewIterator(fp).GetValueAtTime(sample.Timestamp) + found := s.NewIterator(fp).ValueAtTime(sample.Timestamp) if len(found) != 1 { t.Errorf("Sample %#v: Expected exactly one value, found %d.", sample, len(found)) result = false diff --git a/web/api/query.go b/web/api/query.go index d606898b4..52da47f8b 100644 --- a/web/api/query.go +++ b/web/api/query.go @@ -156,7 +156,7 @@ func (serv MetricsService) Metrics(w http.ResponseWriter, r *http.Request) { setAccessControlHeaders(w) w.Header().Set("Content-Type", "application/json") - metricNames := serv.Storage.GetLabelValuesForLabelName(clientmodel.MetricNameLabel) + metricNames := serv.Storage.LabelValuesForLabelName(clientmodel.MetricNameLabel) sort.Sort(metricNames) resultBytes, err := json.Marshal(metricNames) if err != nil {