mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 14:57:40 -08:00
Make chunkIterator even leaner.
This commit is contained in:
parent
7cdfae1466
commit
161eada3ad
|
@ -260,42 +260,52 @@ type chunk interface {
|
|||
|
||||
// A chunkIterator enables efficient access to the content of a chunk. It is
|
||||
// generally not safe to use a chunkIterator concurrently with or after chunk
|
||||
// mutation. The error returned by any of the methods is always the last error
|
||||
// encountered by the iterator, i.e. once an error has been encountered, no
|
||||
// method will ever return a nil error again. In general, errors signal data
|
||||
// corruption in the chunk and require quarantining.
|
||||
// mutation.
|
||||
type chunkIterator interface {
|
||||
// Gets the last timestamp in the chunk.
|
||||
lastTimestamp() (model.Time, error)
|
||||
// Gets the value that is closest before the given time. In case a value
|
||||
// exists at precisely the given time, that value is returned. If no
|
||||
// applicable value exists, ZeroSamplePair is returned.
|
||||
valueAtOrBeforeTime(model.Time) (model.SamplePair, error)
|
||||
// Gets all values contained within a given interval.
|
||||
rangeValues(metric.Interval) ([]model.SamplePair, error)
|
||||
// Whether a given timestamp is contained between first and last value
|
||||
// in the chunk.
|
||||
contains(model.Time) (bool, error)
|
||||
// scan, value, and err implement a bufio.Scanner-like interface. The
|
||||
// scan method advances the iterator to the next value in the chunk and
|
||||
// returns true if that worked. In that case, the value method will
|
||||
// return the sample pair the iterator has advanced to. If the scan
|
||||
// method returns false, either an error has occured or the end of the
|
||||
// chunk has been reached. In the former case, the err method will
|
||||
// return the error. In the latter case, the err method will return nil.
|
||||
// Upon creation, the iterator is at position "minus one". After the
|
||||
// first scan call, value will return the 1st value in the
|
||||
// chunk. valueAtOrBeforeTime and rangeValues all modify the iterator
|
||||
// position, too. They behave as if their return values were retrieved
|
||||
// after a scan call, i.e. calling the value or err methods after a call
|
||||
// to those methods will retrieve the same return value again (or the
|
||||
// last value in the range in case of rangeValues), and subsequent scan
|
||||
// calls will advance the iterator from there.
|
||||
// Scans the next value in the chunk. Directly after the iterator has
|
||||
// been created, the next value is the first value in the
|
||||
// chunk. Otherwise, it is the value following the last value scanned or
|
||||
// found (by one of the find... methods). Returns false if either the
|
||||
// end of the chunk is reached or an error has occurred.
|
||||
scan() bool
|
||||
// Finds the most recent value at or before the provided time. Returns
|
||||
// false if either the chunk contains no value at or before the provided
|
||||
// time, or an error has occurred.
|
||||
findAtOrBefore(model.Time) bool
|
||||
// Finds the oldest value at or after the provided time. Returns false
|
||||
// if either the chunk contains no value at or after the provided time,
|
||||
// or an error has occurred.
|
||||
findAtOrAfter(model.Time) bool
|
||||
// Returns the last value scanned (by the scan method) or found (by one
|
||||
// of the find... methods). It returns ZeroSamplePair before any of
|
||||
// those methods were called.
|
||||
value() model.SamplePair
|
||||
// Returns the last error encountered. In general, an error signal data
|
||||
// corruption in the chunk and requires quarantining.
|
||||
err() error
|
||||
}
|
||||
|
||||
// rangeValues is a utility function that retrieves all values within the given
|
||||
// range from a chunkIterator.
|
||||
func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) {
|
||||
result := []model.SamplePair{}
|
||||
if !it.findAtOrAfter(in.OldestInclusive) {
|
||||
return result, it.err()
|
||||
}
|
||||
for !it.value().Timestamp.After(in.NewestInclusive) {
|
||||
result = append(result, it.value())
|
||||
if !it.scan() {
|
||||
break
|
||||
}
|
||||
}
|
||||
return result, it.err()
|
||||
}
|
||||
|
||||
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
|
||||
chunkOps.WithLabelValues(transcode).Inc()
|
||||
|
||||
|
@ -374,46 +384,6 @@ func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) {
|
|||
return it.acc.timestampAtIndex(it.len - 1), it.acc.err()
|
||||
}
|
||||
|
||||
// valueAtOrBeforeTime implements chunkIterator.
|
||||
func (it *indexAccessingChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
|
||||
i := sort.Search(it.len, func(i int) bool {
|
||||
return it.acc.timestampAtIndex(i).After(t)
|
||||
})
|
||||
if i == 0 || it.acc.err() != nil {
|
||||
return ZeroSamplePair, it.acc.err()
|
||||
}
|
||||
it.pos = i - 1
|
||||
it.lastValue = model.SamplePair{
|
||||
Timestamp: it.acc.timestampAtIndex(i - 1),
|
||||
Value: it.acc.sampleValueAtIndex(i - 1),
|
||||
}
|
||||
return it.lastValue, it.acc.err()
|
||||
}
|
||||
|
||||
// rangeValues implements chunkIterator.
|
||||
func (it *indexAccessingChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
|
||||
oldest := sort.Search(it.len, func(i int) bool {
|
||||
return !it.acc.timestampAtIndex(i).Before(in.OldestInclusive)
|
||||
})
|
||||
newest := sort.Search(it.len, func(i int) bool {
|
||||
return it.acc.timestampAtIndex(i).After(in.NewestInclusive)
|
||||
})
|
||||
if oldest == it.len || it.acc.err() != nil {
|
||||
return nil, it.acc.err()
|
||||
}
|
||||
|
||||
result := make([]model.SamplePair, 0, newest-oldest)
|
||||
for i := oldest; i < newest; i++ {
|
||||
it.pos = i
|
||||
it.lastValue = model.SamplePair{
|
||||
Timestamp: it.acc.timestampAtIndex(i),
|
||||
Value: it.acc.sampleValueAtIndex(i),
|
||||
}
|
||||
result = append(result, it.lastValue)
|
||||
}
|
||||
return result, it.acc.err()
|
||||
}
|
||||
|
||||
// contains implements chunkIterator.
|
||||
func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) {
|
||||
return !t.Before(it.acc.timestampAtIndex(0)) &&
|
||||
|
@ -433,6 +403,38 @@ func (it *indexAccessingChunkIterator) scan() bool {
|
|||
return it.acc.err() == nil
|
||||
}
|
||||
|
||||
// findAtOrBefore implements chunkIterator.
|
||||
func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool {
|
||||
i := sort.Search(it.len, func(i int) bool {
|
||||
return it.acc.timestampAtIndex(i).After(t)
|
||||
})
|
||||
if i == 0 || it.acc.err() != nil {
|
||||
return false
|
||||
}
|
||||
it.pos = i - 1
|
||||
it.lastValue = model.SamplePair{
|
||||
Timestamp: it.acc.timestampAtIndex(i - 1),
|
||||
Value: it.acc.sampleValueAtIndex(i - 1),
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// findAtOrAfter implements chunkIterator.
|
||||
func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool {
|
||||
i := sort.Search(it.len, func(i int) bool {
|
||||
return !it.acc.timestampAtIndex(i).Before(t)
|
||||
})
|
||||
if i == it.len || it.acc.err() != nil {
|
||||
return false
|
||||
}
|
||||
it.pos = i
|
||||
it.lastValue = model.SamplePair{
|
||||
Timestamp: it.acc.timestampAtIndex(i),
|
||||
Value: it.acc.sampleValueAtIndex(i),
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// value implements chunkIterator.
|
||||
func (it *indexAccessingChunkIterator) value() model.SamplePair {
|
||||
return it.lastValue
|
||||
|
|
|
@ -547,12 +547,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
|
|||
return ZeroSamplePair
|
||||
}
|
||||
if containsT {
|
||||
value, err := it.chunkIt.valueAtOrBeforeTime(t)
|
||||
if err != nil {
|
||||
it.quarantine(err)
|
||||
return ZeroSamplePair
|
||||
if it.chunkIt.findAtOrBefore(t) {
|
||||
return it.chunkIt.value()
|
||||
}
|
||||
return value
|
||||
if it.chunkIt.err() != nil {
|
||||
it.quarantine(it.chunkIt.err())
|
||||
}
|
||||
return ZeroSamplePair
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -570,12 +571,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
|
|||
return ZeroSamplePair
|
||||
}
|
||||
it.chunkIt = it.chunkIterator(l - i)
|
||||
value, err := it.chunkIt.valueAtOrBeforeTime(t)
|
||||
if err != nil {
|
||||
it.quarantine(err)
|
||||
return ZeroSamplePair
|
||||
if it.chunkIt.findAtOrBefore(t) {
|
||||
return it.chunkIt.value()
|
||||
}
|
||||
return value
|
||||
if it.chunkIt.err() != nil {
|
||||
it.quarantine(it.chunkIt.err())
|
||||
}
|
||||
return ZeroSamplePair
|
||||
}
|
||||
|
||||
// RangeValues implements SeriesIterator.
|
||||
|
@ -602,7 +604,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
|
|||
if c.firstTime().After(in.NewestInclusive) {
|
||||
break
|
||||
}
|
||||
chValues, err := it.chunkIterator(i + j).rangeValues(in)
|
||||
chValues, err := rangeValues(it.chunkIterator(i+j), in)
|
||||
if err != nil {
|
||||
it.quarantine(err)
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue