mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Slim down the chunkIterator interface
For one, remove unneeded methods. Then, instead of using a channel for all values, use a bufio.Scanner-like interface. This removes the need for creating a goroutine and avoids the (unnecessary) locking performed by channel sending and receiving. This will make it much easier to write new chunk implementations (like Gorilla-style encoding).
This commit is contained in:
parent
fc7de5374a
commit
32f280a3cd
|
@ -259,18 +259,13 @@ type chunk interface {
|
||||||
|
|
||||||
// A chunkIterator enables efficient access to the content of a chunk. It is
|
// A chunkIterator enables efficient access to the content of a chunk. It is
|
||||||
// generally not safe to use a chunkIterator concurrently with or after chunk
|
// generally not safe to use a chunkIterator concurrently with or after chunk
|
||||||
// mutation.
|
// mutation. The error returned by any of the methods is always the last error
|
||||||
|
// encountered by the iterator, i.e. once an error has been encountered, no
|
||||||
|
// method will ever return a nil error again. In general, errors signal data
|
||||||
|
// corruption in the chunk and require quarantining.
|
||||||
type chunkIterator interface {
|
type chunkIterator interface {
|
||||||
// length returns the number of samples in the chunk.
|
|
||||||
length() int
|
|
||||||
// Gets the timestamp of the n-th sample in the chunk.
|
|
||||||
timestampAtIndex(int) (model.Time, error)
|
|
||||||
// Gets the last timestamp in the chunk.
|
// Gets the last timestamp in the chunk.
|
||||||
lastTimestamp() (model.Time, error)
|
lastTimestamp() (model.Time, error)
|
||||||
// Gets the sample value of the n-th sample in the chunk.
|
|
||||||
sampleValueAtIndex(int) (model.SampleValue, error)
|
|
||||||
// Gets the last sample value in the chunk.
|
|
||||||
lastSampleValue() (model.SampleValue, error)
|
|
||||||
// Gets the value that is closest before the given time. In case a value
|
// Gets the value that is closest before the given time. In case a value
|
||||||
// exists at precisely the given time, that value is returned. If no
|
// exists at precisely the given time, that value is returned. If no
|
||||||
// applicable value exists, ZeroSamplePair is returned.
|
// applicable value exists, ZeroSamplePair is returned.
|
||||||
|
@ -280,35 +275,48 @@ type chunkIterator interface {
|
||||||
// Whether a given timestamp is contained between first and last value
|
// Whether a given timestamp is contained between first and last value
|
||||||
// in the chunk.
|
// in the chunk.
|
||||||
contains(model.Time) (bool, error)
|
contains(model.Time) (bool, error)
|
||||||
// values returns a channel, from which all sample values in the chunk
|
// scan, value, and err implement a bufio.Scanner-like interface. The
|
||||||
// can be received in order. The channel is closed after the last
|
// scan method advances the iterator to the next value in the chunk and
|
||||||
// one. It is generally not safe to mutate the chunk while the channel
|
// returns true if that worked. In that case, the value method will
|
||||||
// is still open. If a value is returned with error!=nil, no further
|
// return the sample pair the iterator has advanced to. If the scan
|
||||||
// values will be returned and the channel is closed.
|
// method returns false, either an error has occured or the end of the
|
||||||
values() <-chan struct {
|
// chunk has been reached. In the former case, the err method will
|
||||||
model.SamplePair
|
// return the error. In the latter case, the err method will return nil.
|
||||||
error
|
// Upon creation, the iterator is at position "minus one". After the
|
||||||
}
|
// first scan call, value will return the 1st value in the
|
||||||
|
// chunk. valueAtOrBeforeTime and rangeValues all modify the iterator
|
||||||
|
// position, too. They behave as if their return values were retrieved
|
||||||
|
// after a scan call, i.e. calling the value or err methods after a call
|
||||||
|
// to those methods will retrieve the same return value again (or the
|
||||||
|
// last value in the range in case of rangeValues), and subsequent scan
|
||||||
|
// calls will advance the iterator from there.
|
||||||
|
scan() bool
|
||||||
|
value() model.SamplePair
|
||||||
|
err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
|
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
|
||||||
chunkOps.WithLabelValues(transcode).Inc()
|
chunkOps.WithLabelValues(transcode).Inc()
|
||||||
|
|
||||||
head := dst
|
var (
|
||||||
body := []chunk{}
|
head = dst
|
||||||
for v := range src.newIterator().values() {
|
body, newChunks []chunk
|
||||||
if v.error != nil {
|
err error
|
||||||
return nil, v.error
|
)
|
||||||
}
|
|
||||||
newChunks, err := head.add(v.SamplePair)
|
it := src.newIterator()
|
||||||
if err != nil {
|
for it.scan() {
|
||||||
|
if newChunks, err = head.add(it.value()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
body = append(body, newChunks[:len(newChunks)-1]...)
|
body = append(body, newChunks[:len(newChunks)-1]...)
|
||||||
head = newChunks[len(newChunks)-1]
|
head = newChunks[len(newChunks)-1]
|
||||||
}
|
}
|
||||||
newChunks, err := head.add(s)
|
if it.err() != nil {
|
||||||
if err != nil {
|
return nil, it.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
if newChunks, err = head.add(s); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return append(body, newChunks...), nil
|
return append(body, newChunks...), nil
|
||||||
|
|
|
@ -209,6 +209,8 @@ func (c *deltaEncodedChunk) newIterator() chunkIterator {
|
||||||
tBytes: c.timeBytes(),
|
tBytes: c.timeBytes(),
|
||||||
vBytes: c.valueBytes(),
|
vBytes: c.valueBytes(),
|
||||||
isInt: c.isInt(),
|
isInt: c.isInt(),
|
||||||
|
pos: -1,
|
||||||
|
lastValue: ZeroSamplePair,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -311,176 +313,130 @@ type deltaEncodedChunkIterator struct {
|
||||||
baseV model.SampleValue
|
baseV model.SampleValue
|
||||||
tBytes, vBytes deltaBytes
|
tBytes, vBytes deltaBytes
|
||||||
isInt bool
|
isInt bool
|
||||||
}
|
pos int
|
||||||
|
lastValue model.SamplePair
|
||||||
// length implements chunkIterator.
|
lastErr error
|
||||||
func (it *deltaEncodedChunkIterator) length() int { return it.len }
|
|
||||||
|
|
||||||
// valueAtOrBeforeTime implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
|
|
||||||
var lastErr error
|
|
||||||
i := sort.Search(it.len, func(i int) bool {
|
|
||||||
ts, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return ts.After(t)
|
|
||||||
})
|
|
||||||
if i == 0 || lastErr != nil {
|
|
||||||
return ZeroSamplePair, lastErr
|
|
||||||
}
|
|
||||||
ts, err := it.timestampAtIndex(i - 1)
|
|
||||||
if err != nil {
|
|
||||||
return ZeroSamplePair, err
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i - 1)
|
|
||||||
if err != nil {
|
|
||||||
return ZeroSamplePair, err
|
|
||||||
}
|
|
||||||
return model.SamplePair{Timestamp: ts, Value: v}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// rangeValues implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
|
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
oldest := sort.Search(it.len, func(i int) bool {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return !t.Before(in.OldestInclusive)
|
|
||||||
})
|
|
||||||
|
|
||||||
newest := sort.Search(it.len, func(i int) bool {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return t.After(in.NewestInclusive)
|
|
||||||
})
|
|
||||||
|
|
||||||
if oldest == it.len || lastErr != nil {
|
|
||||||
return nil, lastErr
|
|
||||||
}
|
|
||||||
|
|
||||||
result := make([]model.SamplePair, 0, newest-oldest)
|
|
||||||
for i := oldest; i < newest; i++ {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
result = append(result, model.SamplePair{Timestamp: t, Value: v})
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// contains implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
|
|
||||||
lastT, err := it.timestampAtIndex(it.len - 1)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return !t.Before(it.baseT) && !t.After(lastT), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// values implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) values() <-chan struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
} {
|
|
||||||
valuesChan := make(chan struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
})
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < it.len; i++ {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
v, err := it.sampleValueAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{model.SamplePair{Timestamp: t, Value: v}, nil}
|
|
||||||
}
|
|
||||||
close(valuesChan)
|
|
||||||
}()
|
|
||||||
return valuesChan
|
|
||||||
}
|
|
||||||
|
|
||||||
// timestampAtIndex implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
|
|
||||||
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
|
|
||||||
|
|
||||||
switch it.tBytes {
|
|
||||||
case d1:
|
|
||||||
return it.baseT + model.Time(uint8(it.c[offset])), nil
|
|
||||||
case d2:
|
|
||||||
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil
|
|
||||||
case d4:
|
|
||||||
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil
|
|
||||||
case d8:
|
|
||||||
// Take absolute value for d8.
|
|
||||||
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
|
|
||||||
default:
|
|
||||||
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastTimestamp implements chunkIterator.
|
// lastTimestamp implements chunkIterator.
|
||||||
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
|
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
|
||||||
return it.timestampAtIndex(it.len - 1)
|
return it.timestampAtIndex(it.len - 1), it.lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// sampleValueAtIndex implements chunkIterator.
|
// valueAtOrBeforeTime implements chunkIterator.
|
||||||
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
|
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
|
||||||
|
i := sort.Search(it.len, func(i int) bool {
|
||||||
|
return it.timestampAtIndex(i).After(t)
|
||||||
|
})
|
||||||
|
if i == 0 || it.lastErr != nil {
|
||||||
|
return ZeroSamplePair, it.lastErr
|
||||||
|
}
|
||||||
|
it.pos = i - 1
|
||||||
|
it.lastValue = model.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i - 1),
|
||||||
|
Value: it.sampleValueAtIndex(i - 1),
|
||||||
|
}
|
||||||
|
return it.lastValue, it.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// rangeValues implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
|
||||||
|
oldest := sort.Search(it.len, func(i int) bool {
|
||||||
|
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
|
||||||
|
})
|
||||||
|
newest := sort.Search(it.len, func(i int) bool {
|
||||||
|
return it.timestampAtIndex(i).After(in.NewestInclusive)
|
||||||
|
})
|
||||||
|
if oldest == it.len || it.lastErr != nil {
|
||||||
|
return nil, it.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]model.SamplePair, 0, newest-oldest)
|
||||||
|
for i := oldest; i < newest; i++ {
|
||||||
|
it.pos = i
|
||||||
|
it.lastValue = model.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i),
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
}
|
||||||
|
result = append(result, it.lastValue)
|
||||||
|
}
|
||||||
|
return result, it.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// contains implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
|
||||||
|
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) scan() bool {
|
||||||
|
it.pos++
|
||||||
|
if it.pos >= it.len {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
it.lastValue = model.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(it.pos),
|
||||||
|
Value: it.sampleValueAtIndex(it.pos),
|
||||||
|
}
|
||||||
|
return it.lastErr == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// value implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) value() model.SamplePair {
|
||||||
|
return it.lastValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// err implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) err() error {
|
||||||
|
return it.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
|
||||||
|
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
|
||||||
|
|
||||||
|
switch it.tBytes {
|
||||||
|
case d1:
|
||||||
|
return it.baseT + model.Time(uint8(it.c[offset]))
|
||||||
|
case d2:
|
||||||
|
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:]))
|
||||||
|
case d4:
|
||||||
|
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:]))
|
||||||
|
case d8:
|
||||||
|
// Take absolute value for d8.
|
||||||
|
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||||
|
default:
|
||||||
|
it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
|
||||||
|
}
|
||||||
|
return model.Earliest
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
|
||||||
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
||||||
|
|
||||||
if it.isInt {
|
if it.isInt {
|
||||||
switch it.vBytes {
|
switch it.vBytes {
|
||||||
case d0:
|
case d0:
|
||||||
return it.baseV, nil
|
return it.baseV
|
||||||
case d1:
|
case d1:
|
||||||
return it.baseV + model.SampleValue(int8(it.c[offset])), nil
|
return it.baseV + model.SampleValue(int8(it.c[offset]))
|
||||||
case d2:
|
case d2:
|
||||||
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
|
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
// No d8 for ints.
|
// No d8 for ints.
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
|
it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch it.vBytes {
|
switch it.vBytes {
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
case d8:
|
case d8:
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
|
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
|
it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return 0
|
||||||
|
|
||||||
// lastSampleValue implements chunkIterator.
|
|
||||||
func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
|
|
||||||
return it.sampleValueAtIndex(it.len - 1)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,6 +217,8 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
|
||||||
tBytes: c.timeBytes(),
|
tBytes: c.timeBytes(),
|
||||||
vBytes: c.valueBytes(),
|
vBytes: c.valueBytes(),
|
||||||
isInt: c.isInt(),
|
isInt: c.isInt(),
|
||||||
|
pos: -1,
|
||||||
|
lastValue: ZeroSamplePair,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,132 +419,95 @@ type doubleDeltaEncodedChunkIterator struct {
|
||||||
baseV, baseΔV model.SampleValue
|
baseV, baseΔV model.SampleValue
|
||||||
tBytes, vBytes deltaBytes
|
tBytes, vBytes deltaBytes
|
||||||
isInt bool
|
isInt bool
|
||||||
|
pos int
|
||||||
|
lastValue model.SamplePair
|
||||||
|
lastErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// length implements chunkIterator.
|
// lastTimestamp implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
|
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
|
||||||
|
return it.timestampAtIndex(it.len - 1), it.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
// valueAtOrBeforeTime implements chunkIterator.
|
// valueAtOrBeforeTime implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
|
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
|
||||||
var lastErr error
|
|
||||||
i := sort.Search(it.len, func(i int) bool {
|
i := sort.Search(it.len, func(i int) bool {
|
||||||
ts, err := it.timestampAtIndex(i)
|
return it.timestampAtIndex(i).After(t)
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return ts.After(t)
|
|
||||||
})
|
})
|
||||||
if i == 0 || lastErr != nil {
|
if i == 0 || it.lastErr != nil {
|
||||||
return ZeroSamplePair, lastErr
|
return ZeroSamplePair, it.lastErr
|
||||||
}
|
}
|
||||||
ts, err := it.timestampAtIndex(i - 1)
|
it.pos = i - 1
|
||||||
if err != nil {
|
it.lastValue = model.SamplePair{
|
||||||
return ZeroSamplePair, err
|
Timestamp: it.timestampAtIndex(i - 1),
|
||||||
|
Value: it.sampleValueAtIndex(i - 1),
|
||||||
}
|
}
|
||||||
v, err := it.sampleValueAtIndex(i - 1)
|
return it.lastValue, it.lastErr
|
||||||
if err != nil {
|
|
||||||
return ZeroSamplePair, err
|
|
||||||
}
|
|
||||||
return model.SamplePair{Timestamp: ts, Value: v}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// rangeValues implements chunkIterator.
|
// rangeValues implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
|
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
oldest := sort.Search(it.len, func(i int) bool {
|
oldest := sort.Search(it.len, func(i int) bool {
|
||||||
t, err := it.timestampAtIndex(i)
|
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return !t.Before(in.OldestInclusive)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
newest := sort.Search(it.len, func(i int) bool {
|
newest := sort.Search(it.len, func(i int) bool {
|
||||||
t, err := it.timestampAtIndex(i)
|
return it.timestampAtIndex(i).After(in.NewestInclusive)
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
}
|
|
||||||
return t.After(in.NewestInclusive)
|
|
||||||
})
|
})
|
||||||
|
if oldest == it.len || it.lastErr != nil {
|
||||||
if oldest == it.len || lastErr != nil {
|
return nil, it.lastErr
|
||||||
return nil, lastErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]model.SamplePair, 0, newest-oldest)
|
result := make([]model.SamplePair, 0, newest-oldest)
|
||||||
for i := oldest; i < newest; i++ {
|
for i := oldest; i < newest; i++ {
|
||||||
t, err := it.timestampAtIndex(i)
|
it.pos = i
|
||||||
if err != nil {
|
it.lastValue = model.SamplePair{
|
||||||
return nil, err
|
Timestamp: it.timestampAtIndex(i),
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
}
|
}
|
||||||
v, err := it.sampleValueAtIndex(i)
|
result = append(result, it.lastValue)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
result = append(result, model.SamplePair{Timestamp: t, Value: v})
|
return result, it.lastErr
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// contains implements chunkIterator.
|
// contains implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
|
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
|
||||||
lastT, err := it.timestampAtIndex(it.len - 1)
|
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return !t.Before(it.baseT) && !t.After(lastT), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// values implements chunkIterator.
|
// scan implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct {
|
func (it *doubleDeltaEncodedChunkIterator) scan() bool {
|
||||||
model.SamplePair
|
it.pos++
|
||||||
error
|
if it.pos >= it.len {
|
||||||
} {
|
return false
|
||||||
valuesChan := make(chan struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
})
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < it.len; i++ {
|
|
||||||
t, err := it.timestampAtIndex(i)
|
|
||||||
if err != nil {
|
|
||||||
valuesChan <- struct {
|
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
v, err := it.sampleValueAtIndex(i)
|
it.lastValue = model.SamplePair{
|
||||||
if err != nil {
|
Timestamp: it.timestampAtIndex(it.pos),
|
||||||
valuesChan <- struct {
|
Value: it.sampleValueAtIndex(it.pos),
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{ZeroSamplePair, err}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
valuesChan <- struct {
|
return it.lastErr == nil
|
||||||
model.SamplePair
|
|
||||||
error
|
|
||||||
}{model.SamplePair{Timestamp: t, Value: v}, nil}
|
|
||||||
}
|
|
||||||
close(valuesChan)
|
|
||||||
}()
|
|
||||||
return valuesChan
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// timestampAtIndex implements chunkIterator.
|
// value implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
|
func (it *doubleDeltaEncodedChunkIterator) value() model.SamplePair {
|
||||||
|
return it.lastValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// err implements chunkIterator.
|
||||||
|
func (it *doubleDeltaEncodedChunkIterator) err() error {
|
||||||
|
return it.lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
|
||||||
if idx == 0 {
|
if idx == 0 {
|
||||||
return it.baseT, nil
|
return it.baseT
|
||||||
}
|
}
|
||||||
if idx == 1 {
|
if idx == 1 {
|
||||||
// If time bytes are at d8, the time is saved directly rather
|
// If time bytes are at d8, the time is saved directly rather
|
||||||
// than as a difference.
|
// than as a difference.
|
||||||
if it.tBytes == d8 {
|
if it.tBytes == d8 {
|
||||||
return it.baseΔT, nil
|
return it.baseΔT
|
||||||
}
|
}
|
||||||
return it.baseT + it.baseΔT, nil
|
return it.baseT + it.baseΔT
|
||||||
}
|
}
|
||||||
|
|
||||||
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
|
||||||
|
@ -551,40 +516,35 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time
|
||||||
case d1:
|
case d1:
|
||||||
return it.baseT +
|
return it.baseT +
|
||||||
model.Time(idx)*it.baseΔT +
|
model.Time(idx)*it.baseΔT +
|
||||||
model.Time(int8(it.c[offset])), nil
|
model.Time(int8(it.c[offset]))
|
||||||
case d2:
|
case d2:
|
||||||
return it.baseT +
|
return it.baseT +
|
||||||
model.Time(idx)*it.baseΔT +
|
model.Time(idx)*it.baseΔT +
|
||||||
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
|
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseT +
|
return it.baseT +
|
||||||
model.Time(idx)*it.baseΔT +
|
model.Time(idx)*it.baseΔT +
|
||||||
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
case d8:
|
case d8:
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
|
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
|
it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
|
||||||
}
|
}
|
||||||
|
return model.Earliest
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastTimestamp implements chunkIterator.
|
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
|
||||||
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
|
|
||||||
return it.timestampAtIndex(it.len - 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sampleValueAtIndex implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
|
|
||||||
if idx == 0 {
|
if idx == 0 {
|
||||||
return it.baseV, nil
|
return it.baseV
|
||||||
}
|
}
|
||||||
if idx == 1 {
|
if idx == 1 {
|
||||||
// If value bytes are at d8, the value is saved directly rather
|
// If value bytes are at d8, the value is saved directly rather
|
||||||
// than as a difference.
|
// than as a difference.
|
||||||
if it.vBytes == d8 {
|
if it.vBytes == d8 {
|
||||||
return it.baseΔV, nil
|
return it.baseΔV
|
||||||
}
|
}
|
||||||
return it.baseV + it.baseΔV, nil
|
return it.baseV + it.baseΔV
|
||||||
}
|
}
|
||||||
|
|
||||||
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
||||||
|
@ -593,39 +553,35 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.Sa
|
||||||
switch it.vBytes {
|
switch it.vBytes {
|
||||||
case d0:
|
case d0:
|
||||||
return it.baseV +
|
return it.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV, nil
|
model.SampleValue(idx)*it.baseΔV
|
||||||
case d1:
|
case d1:
|
||||||
return it.baseV +
|
return it.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*it.baseΔV +
|
||||||
model.SampleValue(int8(it.c[offset])), nil
|
model.SampleValue(int8(it.c[offset]))
|
||||||
case d2:
|
case d2:
|
||||||
return it.baseV +
|
return it.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*it.baseΔV +
|
||||||
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
|
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV +
|
return it.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*it.baseΔV +
|
||||||
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
// No d8 for ints.
|
// No d8 for ints.
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
|
it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
switch it.vBytes {
|
switch it.vBytes {
|
||||||
case d4:
|
case d4:
|
||||||
return it.baseV +
|
return it.baseV +
|
||||||
model.SampleValue(idx)*it.baseΔV +
|
model.SampleValue(idx)*it.baseΔV +
|
||||||
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
|
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
case d8:
|
case d8:
|
||||||
// Take absolute value for d8.
|
// Take absolute value for d8.
|
||||||
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
|
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
|
it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return 0
|
||||||
|
|
||||||
// lastSampleValue implements chunkIterator.
|
|
||||||
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
|
|
||||||
return it.sampleValueAtIndex(it.len - 1)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,14 +82,14 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint
|
||||||
}
|
}
|
||||||
|
|
||||||
func chunksEqual(c1, c2 chunk) bool {
|
func chunksEqual(c1, c2 chunk) bool {
|
||||||
values2 := c2.newIterator().values()
|
it1 := c1.newIterator()
|
||||||
for v1 := range c1.newIterator().values() {
|
it2 := c2.newIterator()
|
||||||
v2 := <-values2
|
for it1.scan() && it2.scan() {
|
||||||
if !(v1 == v2) {
|
if !(it1.value() == it2.value()) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return it1.err() == nil && it2.err() == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
|
func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
|
@ -692,11 +692,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
|
||||||
if cd.isEvicted() {
|
if cd.isEvicted() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for sample := range cd.c.newIterator().values() {
|
it := cd.c.newIterator()
|
||||||
if sample.error != nil {
|
for it.scan() {
|
||||||
t.Error(sample.error)
|
values = append(values, it.value())
|
||||||
}
|
}
|
||||||
values = append(values, sample.SamplePair)
|
if it.err() != nil {
|
||||||
|
t.Error(it.err())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue