mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Make chunk and series iterators more efficient.
This commit is contained in:
parent
f79c694be5
commit
cd5574bf8a
|
@ -124,7 +124,7 @@ func (cd *chunkDesc) lastTime() clientmodel.Timestamp {
|
|||
if cd.chunk == nil {
|
||||
return cd.chunkLastTime
|
||||
}
|
||||
return cd.chunk.lastTime()
|
||||
return cd.chunk.newIterator().getLastTimestamp()
|
||||
}
|
||||
|
||||
func (cd *chunkDesc) isEvicted() bool {
|
||||
|
@ -169,7 +169,7 @@ func (cd *chunkDesc) maybeEvict() bool {
|
|||
return false
|
||||
}
|
||||
cd.chunkFirstTime = cd.chunk.firstTime()
|
||||
cd.chunkLastTime = cd.chunk.lastTime()
|
||||
cd.chunkLastTime = cd.chunk.newIterator().getLastTimestamp()
|
||||
cd.chunk = nil
|
||||
chunkOps.WithLabelValues(evict).Inc()
|
||||
atomic.AddInt64(&numMemChunks, -1)
|
||||
|
@ -188,23 +188,27 @@ type chunk interface {
|
|||
add(sample *metric.SamplePair) []chunk
|
||||
clone() chunk
|
||||
firstTime() clientmodel.Timestamp
|
||||
lastTime() clientmodel.Timestamp
|
||||
newIterator() chunkIterator
|
||||
marshal(io.Writer) error
|
||||
unmarshal(io.Reader) error
|
||||
unmarshalFromBuf([]byte)
|
||||
encoding() chunkEncoding
|
||||
// values returns a channel, from which all sample values in the chunk
|
||||
// can be received in order. The channel is closed after the last
|
||||
// one. It is generally not safe to mutate the chunk while the channel
|
||||
// is still open.
|
||||
values() <-chan *metric.SamplePair
|
||||
}
|
||||
|
||||
// A chunkIterator enables efficient access to the content of a chunk. It is
|
||||
// generally not safe to use a chunkIterator concurrently with or after chunk
|
||||
// mutation.
|
||||
type chunkIterator interface {
|
||||
// length returns the number of samples in the chunk.
|
||||
length() int
|
||||
// Gets the timestamp of the n-th sample in the chunk.
|
||||
getTimestampAtIndex(int) clientmodel.Timestamp
|
||||
// Gets the last timestamp in the chunk.
|
||||
getLastTimestamp() clientmodel.Timestamp
|
||||
// Gets the sample value of the n-th sample in the chunk.
|
||||
getSampleValueAtIndex(int) clientmodel.SampleValue
|
||||
// Gets the last sample value in the chunk.
|
||||
getLastSampleValue() clientmodel.SampleValue
|
||||
// Gets the two values that are immediately adjacent to a given time. In
|
||||
// case a value exist at precisely the given time, only that single
|
||||
// value is returned. Only the first or last value is returned (as a
|
||||
|
@ -216,6 +220,11 @@ type chunkIterator interface {
|
|||
// Whether a given timestamp is contained between first and last value
|
||||
// in the chunk.
|
||||
contains(clientmodel.Timestamp) bool
|
||||
// values returns a channel, from which all sample values in the chunk
|
||||
// can be received in order. The channel is closed after the last
|
||||
// one. It is generally not safe to mutate the chunk while the channel
|
||||
// is still open.
|
||||
values() <-chan *metric.SamplePair
|
||||
}
|
||||
|
||||
func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
|
||||
|
@ -223,7 +232,7 @@ func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
|
|||
|
||||
head := dst
|
||||
body := []chunk{}
|
||||
for v := range src.values() {
|
||||
for v := range src.newIterator().values() {
|
||||
newChunks := head.add(v)
|
||||
body = append(body, newChunks[:len(newChunks)-1]...)
|
||||
head = newChunks[len(newChunks)-1]
|
||||
|
|
|
@ -188,18 +188,19 @@ func (c deltaEncodedChunk) clone() chunk {
|
|||
|
||||
// firstTime implements chunk.
|
||||
func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp {
|
||||
return c.valueAtIndex(0).Timestamp
|
||||
}
|
||||
|
||||
// lastTime implements chunk.
|
||||
func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp {
|
||||
return c.valueAtIndex(c.len() - 1).Timestamp
|
||||
return c.baseTime()
|
||||
}
|
||||
|
||||
// newIterator implements chunk.
|
||||
func (c *deltaEncodedChunk) newIterator() chunkIterator {
|
||||
return &deltaEncodedChunkIterator{
|
||||
chunk: c,
|
||||
c: *c,
|
||||
len: c.len(),
|
||||
baseT: c.baseTime(),
|
||||
baseV: c.baseValue(),
|
||||
tBytes: c.timeBytes(),
|
||||
vBytes: c.valueBytes(),
|
||||
isInt: c.isInt(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -237,19 +238,6 @@ func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) {
|
|||
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
|
||||
}
|
||||
|
||||
// values implements chunk.
|
||||
func (c deltaEncodedChunk) values() <-chan *metric.SamplePair {
|
||||
n := c.len()
|
||||
valuesChan := make(chan *metric.SamplePair)
|
||||
go func() {
|
||||
for i := 0; i < n; i++ {
|
||||
valuesChan <- c.valueAtIndex(i)
|
||||
}
|
||||
close(valuesChan)
|
||||
}()
|
||||
return valuesChan
|
||||
}
|
||||
|
||||
// encoding implements chunk.
|
||||
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta }
|
||||
|
||||
|
@ -284,106 +272,157 @@ func (c deltaEncodedChunk) len() int {
|
|||
return (len(c) - deltaHeaderBytes) / c.sampleSize()
|
||||
}
|
||||
|
||||
func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
|
||||
offset := deltaHeaderBytes + idx*c.sampleSize()
|
||||
|
||||
var ts clientmodel.Timestamp
|
||||
switch c.timeBytes() {
|
||||
case d1:
|
||||
ts = c.baseTime() + clientmodel.Timestamp(uint8(c[offset]))
|
||||
case d2:
|
||||
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c[offset:]))
|
||||
case d4:
|
||||
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c[offset:]))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:]))
|
||||
default:
|
||||
panic("Invalid number of bytes for time delta")
|
||||
}
|
||||
|
||||
offset += int(c.timeBytes())
|
||||
|
||||
var v clientmodel.SampleValue
|
||||
if c.isInt() {
|
||||
switch c.valueBytes() {
|
||||
case d0:
|
||||
v = c.baseValue()
|
||||
case d1:
|
||||
v = c.baseValue() + clientmodel.SampleValue(int8(c[offset]))
|
||||
case d2:
|
||||
v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:])))
|
||||
case d4:
|
||||
v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:])))
|
||||
// No d8 for ints.
|
||||
default:
|
||||
panic("Invalid number of bytes for integer delta")
|
||||
}
|
||||
} else {
|
||||
switch c.valueBytes() {
|
||||
case d4:
|
||||
v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:])))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:])))
|
||||
default:
|
||||
panic("Invalid number of bytes for floating point delta")
|
||||
}
|
||||
}
|
||||
return &metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: v,
|
||||
}
|
||||
}
|
||||
|
||||
// deltaEncodedChunkIterator implements chunkIterator.
|
||||
type deltaEncodedChunkIterator struct {
|
||||
chunk *deltaEncodedChunk
|
||||
// TODO: add more fields here to keep track of last position.
|
||||
c deltaEncodedChunk
|
||||
len int
|
||||
baseT clientmodel.Timestamp
|
||||
baseV clientmodel.SampleValue
|
||||
tBytes, vBytes deltaBytes
|
||||
isInt bool
|
||||
}
|
||||
|
||||
// length implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) length() int { return it.len }
|
||||
|
||||
// getValueAtTime implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||
i := sort.Search(it.chunk.len(), func(i int) bool {
|
||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(t)
|
||||
i := sort.Search(it.len, func(i int) bool {
|
||||
return !it.getTimestampAtIndex(i).Before(t)
|
||||
})
|
||||
|
||||
switch i {
|
||||
case 0:
|
||||
return metric.Values{*it.chunk.valueAtIndex(0)}
|
||||
case it.chunk.len():
|
||||
return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)}
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(0),
|
||||
Value: it.getSampleValueAtIndex(0),
|
||||
}}
|
||||
case it.len:
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(it.len - 1),
|
||||
Value: it.getSampleValueAtIndex(it.len - 1),
|
||||
}}
|
||||
default:
|
||||
v := it.chunk.valueAtIndex(i)
|
||||
if v.Timestamp.Equal(t) {
|
||||
return metric.Values{*v}
|
||||
ts := it.getTimestampAtIndex(i)
|
||||
if ts.Equal(t) {
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
}}
|
||||
}
|
||||
return metric.Values{
|
||||
metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(i - 1),
|
||||
Value: it.getSampleValueAtIndex(i - 1),
|
||||
},
|
||||
metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
},
|
||||
}
|
||||
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
|
||||
}
|
||||
}
|
||||
|
||||
// getRangeValues implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
|
||||
oldest := sort.Search(it.chunk.len(), func(i int) bool {
|
||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive)
|
||||
oldest := sort.Search(it.len, func(i int) bool {
|
||||
return !it.getTimestampAtIndex(i).Before(in.OldestInclusive)
|
||||
})
|
||||
|
||||
newest := sort.Search(it.chunk.len(), func(i int) bool {
|
||||
return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive)
|
||||
newest := sort.Search(it.len, func(i int) bool {
|
||||
return it.getTimestampAtIndex(i).After(in.NewestInclusive)
|
||||
})
|
||||
|
||||
if oldest == it.chunk.len() {
|
||||
if oldest == it.len {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make(metric.Values, 0, newest-oldest)
|
||||
for i := oldest; i < newest; i++ {
|
||||
result = append(result, *it.chunk.valueAtIndex(i))
|
||||
result = append(result, metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(i),
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// contains implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
|
||||
return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime())
|
||||
return !t.Before(it.baseT) && !t.After(it.getTimestampAtIndex(it.len-1))
|
||||
}
|
||||
|
||||
// values implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) values() <-chan *metric.SamplePair {
|
||||
valuesChan := make(chan *metric.SamplePair)
|
||||
go func() {
|
||||
for i := 0; i < it.len; i++ {
|
||||
valuesChan <- &metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(i),
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
}
|
||||
}
|
||||
close(valuesChan)
|
||||
}()
|
||||
return valuesChan
|
||||
}
|
||||
|
||||
// getTimestampAtIndex implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) getTimestampAtIndex(idx int) clientmodel.Timestamp {
|
||||
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
|
||||
|
||||
switch it.tBytes {
|
||||
case d1:
|
||||
return it.baseT + clientmodel.Timestamp(uint8(it.c[offset]))
|
||||
case d2:
|
||||
return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint16(it.c[offset:]))
|
||||
case d4:
|
||||
return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint32(it.c[offset:]))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||
default:
|
||||
panic("Invalid number of bytes for time delta")
|
||||
}
|
||||
}
|
||||
|
||||
// getLastTimestamp implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) getLastTimestamp() clientmodel.Timestamp {
|
||||
return it.getTimestampAtIndex(it.len - 1)
|
||||
}
|
||||
|
||||
// getSampleValueAtIndex implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) getSampleValueAtIndex(idx int) clientmodel.SampleValue {
|
||||
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
||||
|
||||
if it.isInt {
|
||||
switch it.vBytes {
|
||||
case d0:
|
||||
return it.baseV
|
||||
case d1:
|
||||
return it.baseV + clientmodel.SampleValue(int8(it.c[offset]))
|
||||
case d2:
|
||||
return it.baseV + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||
case d4:
|
||||
return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||
// No d8 for ints.
|
||||
default:
|
||||
panic("Invalid number of bytes for integer delta")
|
||||
}
|
||||
} else {
|
||||
switch it.vBytes {
|
||||
case d4:
|
||||
return it.baseV + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||
default:
|
||||
panic("Invalid number of bytes for floating point delta")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getLastSampleValue implements chunkIterator.
|
||||
func (it *deltaEncodedChunkIterator) getLastSampleValue() clientmodel.SampleValue {
|
||||
return it.getSampleValueAtIndex(it.len - 1)
|
||||
}
|
||||
|
|
|
@ -199,15 +199,18 @@ func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp {
|
|||
return c.baseTime()
|
||||
}
|
||||
|
||||
// lastTime implements chunk.
|
||||
func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp {
|
||||
return c.valueAtIndex(c.len() - 1).Timestamp
|
||||
}
|
||||
|
||||
// newIterator implements chunk.
|
||||
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
|
||||
return &doubleDeltaEncodedChunkIterator{
|
||||
chunk: c,
|
||||
c: *c,
|
||||
len: c.len(),
|
||||
baseT: c.baseTime(),
|
||||
baseΔT: c.baseTimeDelta(),
|
||||
baseV: c.baseValue(),
|
||||
baseΔV: c.baseValueDelta(),
|
||||
tBytes: c.timeBytes(),
|
||||
vBytes: c.valueBytes(),
|
||||
isInt: c.isInt(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -245,19 +248,6 @@ func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) {
|
|||
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
|
||||
}
|
||||
|
||||
// values implements chunk.
|
||||
func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair {
|
||||
n := c.len()
|
||||
valuesChan := make(chan *metric.SamplePair)
|
||||
go func() {
|
||||
for i := 0; i < n; i++ {
|
||||
valuesChan <- c.valueAtIndex(i)
|
||||
}
|
||||
close(valuesChan)
|
||||
}()
|
||||
return valuesChan
|
||||
}
|
||||
|
||||
// encoding implements chunk.
|
||||
func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta }
|
||||
|
||||
|
@ -280,6 +270,9 @@ func (c doubleDeltaEncodedChunk) baseValue() clientmodel.SampleValue {
|
|||
}
|
||||
|
||||
func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp {
|
||||
if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 {
|
||||
return 0
|
||||
}
|
||||
return clientmodel.Timestamp(
|
||||
binary.LittleEndian.Uint64(
|
||||
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
||||
|
@ -288,6 +281,9 @@ func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp {
|
|||
}
|
||||
|
||||
func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue {
|
||||
if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 {
|
||||
return 0
|
||||
}
|
||||
return clientmodel.SampleValue(
|
||||
math.Float64frombits(
|
||||
binary.LittleEndian.Uint64(
|
||||
|
@ -387,120 +383,56 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb de
|
|||
return []chunk{&c}
|
||||
}
|
||||
|
||||
func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
|
||||
if idx == 0 {
|
||||
return &metric.SamplePair{
|
||||
Timestamp: c.baseTime(),
|
||||
Value: c.baseValue(),
|
||||
}
|
||||
}
|
||||
if idx == 1 {
|
||||
// If time and/or value bytes are at d8, the time and value is
|
||||
// saved directly rather than as a difference.
|
||||
timestamp := c.baseTimeDelta()
|
||||
if c.timeBytes() < d8 {
|
||||
timestamp += c.baseTime()
|
||||
}
|
||||
value := c.baseValueDelta()
|
||||
if c.valueBytes() < d8 {
|
||||
value += c.baseValue()
|
||||
}
|
||||
return &metric.SamplePair{
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
}
|
||||
}
|
||||
offset := doubleDeltaHeaderBytes + (idx-2)*c.sampleSize()
|
||||
|
||||
var ts clientmodel.Timestamp
|
||||
switch c.timeBytes() {
|
||||
case d1:
|
||||
ts = c.baseTime() +
|
||||
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
|
||||
clientmodel.Timestamp(int8(c[offset]))
|
||||
case d2:
|
||||
ts = c.baseTime() +
|
||||
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
|
||||
clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(c[offset:])))
|
||||
case d4:
|
||||
ts = c.baseTime() +
|
||||
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
|
||||
clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(c[offset:])))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:]))
|
||||
default:
|
||||
panic("Invalid number of bytes for time delta")
|
||||
}
|
||||
|
||||
offset += int(c.timeBytes())
|
||||
|
||||
var v clientmodel.SampleValue
|
||||
if c.isInt() {
|
||||
switch c.valueBytes() {
|
||||
case d0:
|
||||
v = c.baseValue() +
|
||||
clientmodel.SampleValue(idx)*c.baseValueDelta()
|
||||
case d1:
|
||||
v = c.baseValue() +
|
||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
||||
clientmodel.SampleValue(int8(c[offset]))
|
||||
case d2:
|
||||
v = c.baseValue() +
|
||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
||||
clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:])))
|
||||
case d4:
|
||||
v = c.baseValue() +
|
||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
||||
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:])))
|
||||
// No d8 for ints.
|
||||
default:
|
||||
panic("Invalid number of bytes for integer delta")
|
||||
}
|
||||
} else {
|
||||
switch c.valueBytes() {
|
||||
case d4:
|
||||
v = c.baseValue() +
|
||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
||||
clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:])))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:])))
|
||||
default:
|
||||
panic("Invalid number of bytes for floating point delta")
|
||||
}
|
||||
}
|
||||
return &metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: v,
|
||||
}
|
||||
}
|
||||
|
||||
// doubleDeltaEncodedChunkIterator implements chunkIterator.
|
||||
type doubleDeltaEncodedChunkIterator struct {
|
||||
chunk *doubleDeltaEncodedChunk
|
||||
// TODO(beorn7): add more fields here to keep track of last position.
|
||||
c doubleDeltaEncodedChunk
|
||||
len int
|
||||
baseT, baseΔT clientmodel.Timestamp
|
||||
baseV, baseΔV clientmodel.SampleValue
|
||||
tBytes, vBytes deltaBytes
|
||||
isInt bool
|
||||
}
|
||||
|
||||
// length implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
|
||||
|
||||
// getValueAtTime implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||
// TODO(beorn7): Implement in a more efficient way making use of the
|
||||
// state of the iterator and internals of the doubleDeltaChunk.
|
||||
i := sort.Search(it.chunk.len(), func(i int) bool {
|
||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(t)
|
||||
i := sort.Search(it.len, func(i int) bool {
|
||||
return !it.getTimestampAtIndex(i).Before(t)
|
||||
})
|
||||
|
||||
switch i {
|
||||
case 0:
|
||||
return metric.Values{*it.chunk.valueAtIndex(0)}
|
||||
case it.chunk.len():
|
||||
return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)}
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(0),
|
||||
Value: it.getSampleValueAtIndex(0),
|
||||
}}
|
||||
case it.len:
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(it.len - 1),
|
||||
Value: it.getSampleValueAtIndex(it.len - 1),
|
||||
}}
|
||||
default:
|
||||
v := it.chunk.valueAtIndex(i)
|
||||
if v.Timestamp.Equal(t) {
|
||||
return metric.Values{*v}
|
||||
ts := it.getTimestampAtIndex(i)
|
||||
if ts.Equal(t) {
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
}}
|
||||
}
|
||||
return metric.Values{
|
||||
metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(i - 1),
|
||||
Value: it.getSampleValueAtIndex(i - 1),
|
||||
},
|
||||
metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
},
|
||||
}
|
||||
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -508,26 +440,143 @@ func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestam
|
|||
func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
|
||||
// TODO(beorn7): Implement in a more efficient way making use of the
|
||||
// state of the iterator and internals of the doubleDeltaChunk.
|
||||
oldest := sort.Search(it.chunk.len(), func(i int) bool {
|
||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive)
|
||||
oldest := sort.Search(it.len, func(i int) bool {
|
||||
return !it.getTimestampAtIndex(i).Before(in.OldestInclusive)
|
||||
})
|
||||
|
||||
newest := sort.Search(it.chunk.len(), func(i int) bool {
|
||||
return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive)
|
||||
newest := sort.Search(it.len, func(i int) bool {
|
||||
return it.getTimestampAtIndex(i).After(in.NewestInclusive)
|
||||
})
|
||||
|
||||
if oldest == it.chunk.len() {
|
||||
if oldest == it.len {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make(metric.Values, 0, newest-oldest)
|
||||
for i := oldest; i < newest; i++ {
|
||||
result = append(result, *it.chunk.valueAtIndex(i))
|
||||
result = append(result, metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(i),
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// contains implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
|
||||
return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime())
|
||||
return !t.Before(it.baseT) && !t.After(it.getTimestampAtIndex(it.len-1))
|
||||
}
|
||||
|
||||
// values implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) values() <-chan *metric.SamplePair {
|
||||
valuesChan := make(chan *metric.SamplePair)
|
||||
go func() {
|
||||
for i := 0; i < it.len; i++ {
|
||||
valuesChan <- &metric.SamplePair{
|
||||
Timestamp: it.getTimestampAtIndex(i),
|
||||
Value: it.getSampleValueAtIndex(i),
|
||||
}
|
||||
}
|
||||
close(valuesChan)
|
||||
}()
|
||||
return valuesChan
|
||||
}
|
||||
|
||||
// getTimestampAtIndex implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) getTimestampAtIndex(idx int) clientmodel.Timestamp {
|
||||
if idx == 0 {
|
||||
return it.baseT
|
||||
}
|
||||
if idx == 1 {
|
||||
// If time bytes are at d8, the time is saved directly rather
|
||||
// than as a difference.
|
||||
if it.tBytes == d8 {
|
||||
return it.baseΔT
|
||||
}
|
||||
return it.baseT + it.baseΔT
|
||||
}
|
||||
|
||||
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
|
||||
|
||||
switch it.tBytes {
|
||||
case d1:
|
||||
return it.baseT +
|
||||
clientmodel.Timestamp(idx)*it.baseΔT +
|
||||
clientmodel.Timestamp(int8(it.c[offset]))
|
||||
case d2:
|
||||
return it.baseT +
|
||||
clientmodel.Timestamp(idx)*it.baseΔT +
|
||||
clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||
case d4:
|
||||
return it.baseT +
|
||||
clientmodel.Timestamp(idx)*it.baseΔT +
|
||||
clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||
default:
|
||||
panic("Invalid number of bytes for time delta")
|
||||
}
|
||||
}
|
||||
|
||||
// getLastTimestamp implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) getLastTimestamp() clientmodel.Timestamp {
|
||||
return it.getTimestampAtIndex(it.len - 1)
|
||||
}
|
||||
|
||||
// getSampleValueAtIndex implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) getSampleValueAtIndex(idx int) clientmodel.SampleValue {
|
||||
if idx == 0 {
|
||||
return it.baseV
|
||||
}
|
||||
if idx == 1 {
|
||||
// If value bytes are at d8, the value is saved directly rather
|
||||
// than as a difference.
|
||||
if it.vBytes == d8 {
|
||||
return it.baseΔV
|
||||
}
|
||||
return it.baseV + it.baseΔV
|
||||
}
|
||||
|
||||
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
||||
|
||||
if it.isInt {
|
||||
switch it.vBytes {
|
||||
case d0:
|
||||
return it.baseV +
|
||||
clientmodel.SampleValue(idx)*it.baseΔV
|
||||
case d1:
|
||||
return it.baseV +
|
||||
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||
clientmodel.SampleValue(int8(it.c[offset]))
|
||||
case d2:
|
||||
return it.baseV +
|
||||
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||
clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||
case d4:
|
||||
return it.baseV +
|
||||
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||
// No d8 for ints.
|
||||
default:
|
||||
panic("Invalid number of bytes for integer delta")
|
||||
}
|
||||
} else {
|
||||
switch it.vBytes {
|
||||
case d4:
|
||||
return it.baseV +
|
||||
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||
clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||
case d8:
|
||||
// Take absolute value for d8.
|
||||
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||
default:
|
||||
panic("Invalid number of bytes for floating point delta")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getLastSampleValue implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) getLastSampleValue() clientmodel.SampleValue {
|
||||
return it.getSampleValueAtIndex(it.len - 1)
|
||||
}
|
||||
|
|
|
@ -880,7 +880,7 @@ func (p *persistence) dropAndPersistChunks(
|
|||
// too old. If that's the case, the chunks in the series file
|
||||
// are all too old, too.
|
||||
i := 0
|
||||
for ; i < len(chunks) && chunks[i].lastTime().Before(beforeTime); i++ {
|
||||
for ; i < len(chunks) && chunks[i].newIterator().getLastTimestamp().Before(beforeTime); i++ {
|
||||
}
|
||||
if i < len(chunks) {
|
||||
firstTimeNotDropped = chunks[i].firstTime()
|
||||
|
@ -1567,8 +1567,14 @@ func chunkIndexForOffset(offset int64) (int, error) {
|
|||
func writeChunkHeader(w io.Writer, c chunk) error {
|
||||
header := make([]byte, chunkHeaderLen)
|
||||
header[chunkHeaderTypeOffset] = byte(c.encoding())
|
||||
binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()))
|
||||
binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime()))
|
||||
binary.LittleEndian.PutUint64(
|
||||
header[chunkHeaderFirstTimeOffset:],
|
||||
uint64(c.firstTime()),
|
||||
)
|
||||
binary.LittleEndian.PutUint64(
|
||||
header[chunkHeaderLastTimeOffset:],
|
||||
uint64(c.newIterator().getLastTimestamp()),
|
||||
)
|
||||
_, err := w.Write(header)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -70,8 +70,8 @@ func buildTestChunks(encoding chunkEncoding) map[clientmodel.Fingerprint][]chunk
|
|||
}
|
||||
|
||||
func chunksEqual(c1, c2 chunk) bool {
|
||||
values2 := c2.values()
|
||||
for v1 := range c1.values() {
|
||||
values2 := c2.newIterator().values()
|
||||
for v1 := range c1.newIterator().values() {
|
||||
v2 := <-values2
|
||||
if !v1.Equal(v2) {
|
||||
return false
|
||||
|
|
|
@ -486,41 +486,57 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
|
|||
return it.chunkIt.getValueAtTime(t)
|
||||
}
|
||||
|
||||
it.chunkIt = nil
|
||||
|
||||
if len(it.chunks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Before or exactly on the first sample of the series.
|
||||
if !t.After(it.chunks[0].firstTime()) {
|
||||
it.chunkIt = it.chunks[0].newIterator()
|
||||
ts := it.chunkIt.getTimestampAtIndex(0)
|
||||
if !t.After(ts) {
|
||||
// return first value of first chunk
|
||||
return it.chunks[0].newIterator().getValueAtTime(t)
|
||||
}
|
||||
// After or exactly on the last sample of the series.
|
||||
if !t.Before(it.chunks[len(it.chunks)-1].lastTime()) {
|
||||
// return last value of last chunk
|
||||
return it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(t)
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: it.chunkIt.getSampleValueAtIndex(0),
|
||||
}}
|
||||
}
|
||||
|
||||
// Find first chunk where lastTime() is after or equal to t.
|
||||
// After or exactly on the last sample of the series.
|
||||
it.chunkIt = it.chunks[len(it.chunks)-1].newIterator()
|
||||
ts = it.chunkIt.getLastTimestamp()
|
||||
if !t.Before(ts) {
|
||||
// return last value of last chunk
|
||||
return metric.Values{metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: it.chunkIt.getSampleValueAtIndex(it.chunkIt.length() - 1),
|
||||
}}
|
||||
}
|
||||
|
||||
// Find last chunk where firstTime() is before or equal to t.
|
||||
l := len(it.chunks) - 1
|
||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||
return !it.chunks[i].lastTime().Before(t)
|
||||
return !it.chunks[l-i].firstTime().After(t)
|
||||
})
|
||||
if i == len(it.chunks) {
|
||||
panic("out of bounds")
|
||||
}
|
||||
|
||||
if t.Before(it.chunks[i].firstTime()) {
|
||||
it.chunkIt = it.chunks[l-i].newIterator()
|
||||
ts = it.chunkIt.getLastTimestamp()
|
||||
if t.After(ts) {
|
||||
// We ended up between two chunks.
|
||||
sp1 := metric.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: it.chunkIt.getSampleValueAtIndex(it.chunkIt.length() - 1),
|
||||
}
|
||||
it.chunkIt = it.chunks[l-i+1].newIterator()
|
||||
return metric.Values{
|
||||
it.chunks[i-1].newIterator().getValueAtTime(t)[0],
|
||||
it.chunks[i].newIterator().getValueAtTime(t)[0],
|
||||
sp1,
|
||||
metric.SamplePair{
|
||||
it.chunkIt.getTimestampAtIndex(0),
|
||||
it.chunkIt.getSampleValueAtIndex(0),
|
||||
},
|
||||
}
|
||||
}
|
||||
// We ended up in the middle of a chunk. We might stay there for a while,
|
||||
// so save it as the current chunk iterator.
|
||||
it.chunkIt = it.chunks[i].newIterator()
|
||||
return it.chunkIt.getValueAtTime(t)
|
||||
}
|
||||
|
||||
|
@ -529,26 +545,34 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
|
|||
it.lock()
|
||||
defer it.unlock()
|
||||
|
||||
// Find the first relevant chunk.
|
||||
// Find the first chunk for which the first sample is within the interval.
|
||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||
return !it.chunks[i].lastTime().Before(in.OldestInclusive)
|
||||
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
|
||||
})
|
||||
// Only now check the last timestamp of the previous chunk (which is
|
||||
// fairly expensive).
|
||||
if i > 0 && !it.chunks[i-1].newIterator().getLastTimestamp().Before(in.OldestInclusive) {
|
||||
i--
|
||||
}
|
||||
|
||||
values := make(metric.Values, 0, 2)
|
||||
for i, c := range it.chunks[i:] {
|
||||
var chunkIt chunkIterator
|
||||
for j, c := range it.chunks[i:] {
|
||||
if c.firstTime().After(in.NewestInclusive) {
|
||||
if len(values) == 1 {
|
||||
// We found the first value before, but are now
|
||||
// We found the first value before but are now
|
||||
// already past the last value. The value we
|
||||
// want must be the last value of the previous
|
||||
// chunk. So backtrack...
|
||||
chunkIt = it.chunks[i-1].newIterator()
|
||||
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
|
||||
chunkIt := it.chunks[j-1].newIterator()
|
||||
values = append(values, metric.SamplePair{
|
||||
Timestamp: chunkIt.getLastTimestamp(),
|
||||
Value: chunkIt.getLastSampleValue(),
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
chunkIt := c.newIterator()
|
||||
if len(values) == 0 {
|
||||
chunkIt = c.newIterator()
|
||||
firstValues := chunkIt.getValueAtTime(in.OldestInclusive)
|
||||
switch len(firstValues) {
|
||||
case 2:
|
||||
|
@ -559,20 +583,18 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
|
|||
panic("unexpected return from getValueAtTime")
|
||||
}
|
||||
}
|
||||
if c.lastTime().After(in.NewestInclusive) {
|
||||
if chunkIt == nil {
|
||||
chunkIt = c.newIterator()
|
||||
}
|
||||
if chunkIt.getLastTimestamp().After(in.NewestInclusive) {
|
||||
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(values) == 1 {
|
||||
// We found exactly one value. In that case, add the most recent we know.
|
||||
values = append(
|
||||
values,
|
||||
it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(in.NewestInclusive)[0],
|
||||
)
|
||||
chunkIt := it.chunks[len(it.chunks)-1].newIterator()
|
||||
values = append(values, metric.SamplePair{
|
||||
Timestamp: chunkIt.getLastTimestamp(),
|
||||
Value: chunkIt.getLastSampleValue(),
|
||||
})
|
||||
}
|
||||
if len(values) == 2 && values[0].Equal(&values[1]) {
|
||||
return values[:1]
|
||||
|
@ -585,10 +607,17 @@ func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values
|
|||
it.lock()
|
||||
defer it.unlock()
|
||||
|
||||
// Find the first relevant chunk.
|
||||
// Find the first chunk for which the first sample is within the interval.
|
||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||
return !it.chunks[i].lastTime().Before(in.OldestInclusive)
|
||||
// TODO: Avoid the expensive newIterator().getLastTimestamp() call.
|
||||
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
|
||||
})
|
||||
// Only now check the last timestamp of the previous chunk (which is
|
||||
// fairly expensive).
|
||||
if i > 0 && !it.chunks[i-1].newIterator().getLastTimestamp().Before(in.OldestInclusive) {
|
||||
i--
|
||||
}
|
||||
|
||||
values := metric.Values{}
|
||||
for _, c := range it.chunks[i:] {
|
||||
if c.firstTime().After(in.NewestInclusive) {
|
||||
|
|
|
@ -208,7 +208,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
|
|||
if cd.isEvicted() {
|
||||
continue
|
||||
}
|
||||
for sample := range cd.chunk.values() {
|
||||
for sample := range cd.chunk.newIterator().values() {
|
||||
values = append(values, *sample)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue