Remove a redundancy from Gorilla-style chunks

So far, the last sample in a chunk was saved twice. That's required
for adding more samples as we need to know the last sample added to
add more samples without iterating through the whole chunk. However,
once the last sample was added to the chunk before it's full, there is
no need to save it twice. Thus, the very last sample added to a chunk
can _only_ be saved in the header fields for the last sample. The
chunk has to be identifiable as closed, then. This information has
been added to the flags byte.
This commit is contained in:
beorn7 2016-03-20 23:09:48 +01:00
parent b6dbb826ae
commit c72979e3ed
2 changed files with 88 additions and 25 deletions

View file

@ -27,7 +27,7 @@ import (
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
// The DefaultChunkEncoding can be changed via a flag. // DefaultChunkEncoding can be changed via a flag.
var DefaultChunkEncoding = doubleDelta var DefaultChunkEncoding = doubleDelta
var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries")

View file

@ -35,7 +35,7 @@ import (
// - last time (int64): 8 bytes bit 0128-0191 // - last time (int64): 8 bytes bit 0128-0191
// - last value (float64): 8 bytes bit 0192-0255 // - last value (float64): 8 bytes bit 0192-0255
// - first Δt (t1-t0, unsigned): 3 bytes bit 0256-0279 // - first Δt (t1-t0, unsigned): 3 bytes bit 0256-0279
// - flags (so far just encoding, byte) 1 byte bit 0280-0287 // - flags (byte) 1 byte bit 0280-0287
// - bit offset for next sample 2 bytes bit 0288-0303 // - bit offset for next sample 2 bytes bit 0288-0303
// - first Δv for value encoding 1, otherwise payload // - first Δv for value encoding 1, otherwise payload
// 4 bytes bit 0304-0335 // 4 bytes bit 0304-0335
@ -50,6 +50,17 @@ import (
// - last leading zeros (1 byte) 1 byte bit 8160-8167 // - last leading zeros (1 byte) 1 byte bit 8160-8167
// - last significant bits (1 byte) 1 byte bit 8168-8175 // - last significant bits (1 byte) 1 byte bit 8168-8175
// //
// FLAGS
//
// The two least significant bits of the flags byte define the value encoding
// for the whole chunk, see below. The most significant byte of the flags byte
// is set if the chunk is closed. No samples can be added anymore to a closed
// chunk. Furthermore, the last value of a closed chunk is only saved in the
// header (last time, last value), while in a chunk that is still open, the last
// sample in the payload is the same sample as saved in the header.
//
// The remaining bits in the flags byte are currently unused.
//
// TIMESTAMP ENCODING // TIMESTAMP ENCODING
// //
// The 1st timestamp is saved directly. // The 1st timestamp is saved directly.
@ -91,8 +102,7 @@ import (
// VALUE ENCODING // VALUE ENCODING
// //
// Value encoding can change and is determined by the two least significant bits // Value encoding can change and is determined by the two least significant bits
// of the 'flags' byte at bit position 280. (The remaining bits could be used // of the 'flags' byte at bit position 280. The encoding can be changed without
// for other flags in the future.) The encoding can be changed without
// transcoding upon adding the 3rd sample. After that, an encoding change // transcoding upon adding the 3rd sample. After that, an encoding change
// results either in transcoding or in closing the chunk and overflowing into a // results either in transcoding or in closing the chunk and overflowing into a
// new chunk. // new chunk.
@ -158,7 +168,7 @@ import (
const ( const (
gorillaMinLength = 128 gorillaMinLength = 128
gorillaMaxLength = 8192 gorillaMaxLength = 8191
// Useful byte offsets. // Useful byte offsets.
gorillaFirstTimeOffset = 0 gorillaFirstTimeOffset = 0
@ -185,8 +195,9 @@ const (
gorillaThirdSampleBitOffset uint16 = gorillaFirstValueDeltaOffset * 8 gorillaThirdSampleBitOffset uint16 = gorillaFirstValueDeltaOffset * 8
// If the bit offset for the next sample is above this threshold, no new // If the bit offset for the next sample is above this threshold, no new
// samples can be added to the chunk (because the payload has already // samples can be added to the chunk's payload (because the payload has
// reached the footer). The chunk is considered closed. // already reached the footer). However, one more sample can be saved in
// the header as the last sample.
gorillaNextSampleBitOffsetThreshold = 8 * gorillaCountOffsetBitOffset gorillaNextSampleBitOffsetThreshold = 8 * gorillaCountOffsetBitOffset
gorillaMaxTimeDelta = 1 << 24 // What fits into a 3-byte timestamp. gorillaMaxTimeDelta = 1 << 24 // What fits into a 3-byte timestamp.
@ -227,7 +238,7 @@ func newGorillaChunk(enc gorillaValueEncoding) *gorillaChunk {
panic(fmt.Errorf("unknown Gorilla value encoding: %v", enc)) panic(fmt.Errorf("unknown Gorilla value encoding: %v", enc))
} }
c := make(gorillaChunk, chunkLen) c := make(gorillaChunk, chunkLen)
c[gorillaFlagOffset] = byte(enc) c.setValueEncoding(enc)
return &c return &c
} }
@ -235,8 +246,10 @@ func newGorillaChunk(enc gorillaValueEncoding) *gorillaChunk {
func (c *gorillaChunk) add(s model.SamplePair) ([]chunk, error) { func (c *gorillaChunk) add(s model.SamplePair) ([]chunk, error) {
offset := c.nextSampleOffset() offset := c.nextSampleOffset()
switch { switch {
case offset > gorillaNextSampleBitOffsetThreshold: case c.closed():
return addToOverflowChunk(c, s) return addToOverflowChunk(c, s)
case offset > gorillaNextSampleBitOffsetThreshold:
return c.addLastSample(s), nil
case offset == gorillaFirstSampleBitOffset: case offset == gorillaFirstSampleBitOffset:
return c.addFirstSample(s), nil return c.addFirstSample(s), nil
case offset == gorillaSecondSampleBitOffset: case offset == gorillaSecondSampleBitOffset:
@ -381,6 +394,22 @@ func (c gorillaChunk) setNextSampleOffset(offset uint16) {
binary.BigEndian.PutUint16(c[gorillaNextSampleBitOffsetOffset:], offset) binary.BigEndian.PutUint16(c[gorillaNextSampleBitOffsetOffset:], offset)
} }
func (c gorillaChunk) valueEncoding() gorillaValueEncoding {
return gorillaValueEncoding(c[gorillaFlagOffset] & 0x03)
}
func (c gorillaChunk) setValueEncoding(enc gorillaValueEncoding) {
if enc > gorillaDirectEncoding {
panic("invalid Gorilla value encoding")
}
c[gorillaFlagOffset] &^= 0x03 // Clear.
c[gorillaFlagOffset] |= byte(enc) // Set.
}
func (c gorillaChunk) closed() bool {
return c[gorillaFlagOffset] > 0x7F // Most significant bit set.
}
func (c gorillaChunk) zeroDDTRepeats() (repeats uint64, offset uint16) { func (c gorillaChunk) zeroDDTRepeats() (repeats uint64, offset uint16) {
offset = binary.BigEndian.Uint16(c[gorillaCountOffsetBitOffset:]) offset = binary.BigEndian.Uint16(c[gorillaCountOffsetBitOffset:])
if offset == 0 { if offset == 0 {
@ -452,7 +481,9 @@ func (c *gorillaChunk) addSecondSample(s model.SamplePair) ([]chunk, error) {
return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta)
} }
if firstTimeDelta > gorillaMaxTimeDelta { if firstTimeDelta > gorillaMaxTimeDelta {
return addToOverflowChunk(c, s) // A time delta too great. Still, we can add it as a last sample
// before overflowing.
return c.addLastSample(s), nil
} }
(*c)[gorillaFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16) (*c)[gorillaFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16)
(*c)[gorillaFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8) (*c)[gorillaFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8)
@ -467,6 +498,18 @@ func (c *gorillaChunk) addSecondSample(s model.SamplePair) ([]chunk, error) {
return []chunk{c}, nil return []chunk{c}, nil
} }
// addLastSample isa a helper method only used by c.add() and in other helper
// methods called by c.add(). It simply sets the given sample as the last sample
// in the heador and declares the chunk closed. In other words, addLastSample
// adds the very last sample added to this chunk ever, while setLastSample sets
// the sample most recently added to the chunk so that it can be used for the
// calculations required to add the next sample.
func (c *gorillaChunk) addLastSample(s model.SamplePair) []chunk {
c.setLastSample(s)
(*c)[gorillaFlagOffset] |= 0x80
return []chunk{c}
}
// addLaterSample is a helper method only used by c.add(). It adds a third or // addLaterSample is a helper method only used by c.add(). It adds a third or
// later sample. // later sample.
func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) { func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) {
@ -475,23 +518,24 @@ func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chun
lastTimeDelta = c.lastTimeDelta() lastTimeDelta = c.lastTimeDelta()
newTimeDelta = s.Timestamp - lastTime newTimeDelta = s.Timestamp - lastTime
lastValue = c.lastValue() lastValue = c.lastValue()
encoding = gorillaValueEncoding((*c)[gorillaFlagOffset]) encoding = c.valueEncoding()
) )
if newTimeDelta < 0 { if newTimeDelta < 0 {
return nil, fmt.Errorf("Δt is less than zero: %v", newTimeDelta) return nil, fmt.Errorf("Δt is less than zero: %v", newTimeDelta)
} }
if newTimeDelta > gorillaMaxTimeDelta {
return addToOverflowChunk(c, s)
}
if offset == gorillaThirdSampleBitOffset { if offset == gorillaThirdSampleBitOffset {
offset, encoding = c.prepForThirdSample(lastValue, s.Value, encoding) offset, encoding = c.prepForThirdSample(lastValue, s.Value, encoding)
} }
if newTimeDelta > gorillaMaxTimeDelta {
// A time delta too great. Still, we can add it as a last sample
// before overflowing.
return c.addLastSample(s), nil
}
// Analyze worst case, does it fit? If not, overflow into new chunk. // Analyze worst case, does it fit? If not, set new sample as the last.
if int(offset)+gorillaWorstCaseBitsPerSample[encoding] > chunkLen*8 { if int(offset)+gorillaWorstCaseBitsPerSample[encoding] > chunkLen*8 {
return addToOverflowChunk(c, s) return c.addLastSample(s), nil
} }
// Transcoding/overflow decisions first. // Transcoding/overflow decisions first.
@ -518,7 +562,7 @@ func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chun
offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta) offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta)
if overflow { if overflow {
return addToOverflowChunk(c, s) return c.addLastSample(s), nil
} }
switch encoding { switch encoding {
case gorillaZeroEncoding: case gorillaZeroEncoding:
@ -580,7 +624,7 @@ func (c gorillaChunk) prepForThirdSample(
encoding = gorillaXOREncoding encoding = gorillaXOREncoding
offset = c.addXORValue(offset, firstValue, lastValue) offset = c.addXORValue(offset, firstValue, lastValue)
} }
c[gorillaFlagOffset] = byte(encoding) c.setValueEncoding(encoding)
c.setNextSampleOffset(offset) c.setNextSampleOffset(offset)
return offset, encoding return offset, encoding
} }
@ -806,7 +850,17 @@ func (c gorillaChunk) readBitPattern(offset, n uint16) uint64 {
} }
type gorillaChunkIterator struct { type gorillaChunkIterator struct {
c gorillaChunk c gorillaChunk
// pos is the bit position within the chunk for the next sample to be
// decoded when scan() is called (i.e. it is _not_ the bit position of
// the sample currently returned by value()). The symbolic values
// gorillaFirstSampleBitOffset and gorillaSecondSampleBitOffset are also
// used for pos. len is the offset of the first bit in the chunk that is
// not part of the payload. If pos==len, then the iterator is positioned
// behind the last sample in the payload. However, the next call of
// scan() still has to check if the chunk is closed, in which case there
// is one more sample, saved in the header. To mark the iterator as
// having scanned that last sample, too, pos is set to len+1.
pos, len uint16 pos, len uint16
t, dT model.Time t, dT model.Time
repeats byte // Repeats of ΔΔt=0. repeats byte // Repeats of ΔΔt=0.
@ -825,7 +879,7 @@ func newGorillaChunkIterator(c gorillaChunk) *gorillaChunkIterator {
c: c, c: c,
len: c.nextSampleOffset(), len: c.nextSampleOffset(),
t: model.Earliest, t: model.Earliest,
enc: gorillaValueEncoding(c[gorillaFlagOffset]), enc: c.valueEncoding(),
significant: 1, significant: 1,
} }
} }
@ -861,9 +915,18 @@ func (it *gorillaChunkIterator) scan() bool {
it.rewound = false it.rewound = false
return true return true
} }
if it.pos >= it.len && it.repeats == 0 { if it.pos > it.len {
return false return false
} }
if it.pos == it.len && it.repeats == 0 {
it.pos = it.len + 1
if !it.c.closed() {
return false
}
it.t = it.c.lastTime()
it.v = it.c.lastValue()
return it.lastError == nil
}
if it.pos == gorillaFirstSampleBitOffset { if it.pos == gorillaFirstSampleBitOffset {
it.t = it.c.firstTime() it.t = it.c.firstTime()
it.v = it.c.firstValue() it.v = it.c.firstValue()
@ -871,11 +934,11 @@ func (it *gorillaChunkIterator) scan() bool {
return it.lastError == nil return it.lastError == nil
} }
if it.pos == gorillaSecondSampleBitOffset { if it.pos == gorillaSecondSampleBitOffset {
if it.len == gorillaThirdSampleBitOffset { if it.len == gorillaThirdSampleBitOffset && !it.c.closed() {
// Special case: Chunk has only two samples. // Special case: Chunk has only two samples.
it.t = it.c.lastTime() it.t = it.c.lastTime()
it.v = it.c.lastValue() it.v = it.c.lastValue()
it.pos = it.len it.pos = it.len + 1
return it.lastError == nil return it.lastError == nil
} }
it.dT = it.c.firstTimeDelta() it.dT = it.c.firstTimeDelta()
@ -929,7 +992,7 @@ func (it *gorillaChunkIterator) findAtOrBefore(t model.Time) bool {
if !t.Before(last) { if !t.Before(last) {
it.t = last it.t = last
it.v = it.c.lastValue() it.v = it.c.lastValue()
it.pos = it.len it.pos = it.len + 1
return true return true
} }
if t == it.t { if t == it.t {