diff --git a/storage/local/chunk.go b/storage/local/chunk.go index e27d066e25..8c5e127b89 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -27,7 +27,7 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -// The DefaultChunkEncoding can be changed via a flag. +// DefaultChunkEncoding can be changed via a flag. var DefaultChunkEncoding = doubleDelta var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") diff --git a/storage/local/gorilla.go b/storage/local/gorilla.go index 4509f62501..809ff1b376 100644 --- a/storage/local/gorilla.go +++ b/storage/local/gorilla.go @@ -35,7 +35,7 @@ import ( // - last time (int64): 8 bytes bit 0128-0191 // - last value (float64): 8 bytes bit 0192-0255 // - first Δt (t1-t0, unsigned): 3 bytes bit 0256-0279 -// - flags (so far just encoding, byte) 1 byte bit 0280-0287 +// - flags (byte) 1 byte bit 0280-0287 // - bit offset for next sample 2 bytes bit 0288-0303 // - first Δv for value encoding 1, otherwise payload // 4 bytes bit 0304-0335 @@ -50,6 +50,17 @@ import ( // - last leading zeros (1 byte) 1 byte bit 8160-8167 // - last significant bits (1 byte) 1 byte bit 8168-8175 // +// FLAGS +// +// The two least significant bits of the flags byte define the value encoding +// for the whole chunk, see below. The most significant byte of the flags byte +// is set if the chunk is closed. No samples can be added anymore to a closed +// chunk. Furthermore, the last value of a closed chunk is only saved in the +// header (last time, last value), while in a chunk that is still open, the last +// sample in the payload is the same sample as saved in the header. +// +// The remaining bits in the flags byte are currently unused. +// // TIMESTAMP ENCODING // // The 1st timestamp is saved directly. @@ -91,8 +102,7 @@ import ( // VALUE ENCODING // // Value encoding can change and is determined by the two least significant bits -// of the 'flags' byte at bit position 280. (The remaining bits could be used -// for other flags in the future.) The encoding can be changed without +// of the 'flags' byte at bit position 280. The encoding can be changed without // transcoding upon adding the 3rd sample. After that, an encoding change // results either in transcoding or in closing the chunk and overflowing into a // new chunk. @@ -158,7 +168,7 @@ import ( const ( gorillaMinLength = 128 - gorillaMaxLength = 8192 + gorillaMaxLength = 8191 // Useful byte offsets. gorillaFirstTimeOffset = 0 @@ -185,8 +195,9 @@ const ( gorillaThirdSampleBitOffset uint16 = gorillaFirstValueDeltaOffset * 8 // If the bit offset for the next sample is above this threshold, no new - // samples can be added to the chunk (because the payload has already - // reached the footer). The chunk is considered closed. + // samples can be added to the chunk's payload (because the payload has + // already reached the footer). However, one more sample can be saved in + // the header as the last sample. gorillaNextSampleBitOffsetThreshold = 8 * gorillaCountOffsetBitOffset gorillaMaxTimeDelta = 1 << 24 // What fits into a 3-byte timestamp. @@ -227,7 +238,7 @@ func newGorillaChunk(enc gorillaValueEncoding) *gorillaChunk { panic(fmt.Errorf("unknown Gorilla value encoding: %v", enc)) } c := make(gorillaChunk, chunkLen) - c[gorillaFlagOffset] = byte(enc) + c.setValueEncoding(enc) return &c } @@ -235,8 +246,10 @@ func newGorillaChunk(enc gorillaValueEncoding) *gorillaChunk { func (c *gorillaChunk) add(s model.SamplePair) ([]chunk, error) { offset := c.nextSampleOffset() switch { - case offset > gorillaNextSampleBitOffsetThreshold: + case c.closed(): return addToOverflowChunk(c, s) + case offset > gorillaNextSampleBitOffsetThreshold: + return c.addLastSample(s), nil case offset == gorillaFirstSampleBitOffset: return c.addFirstSample(s), nil case offset == gorillaSecondSampleBitOffset: @@ -381,6 +394,22 @@ func (c gorillaChunk) setNextSampleOffset(offset uint16) { binary.BigEndian.PutUint16(c[gorillaNextSampleBitOffsetOffset:], offset) } +func (c gorillaChunk) valueEncoding() gorillaValueEncoding { + return gorillaValueEncoding(c[gorillaFlagOffset] & 0x03) +} + +func (c gorillaChunk) setValueEncoding(enc gorillaValueEncoding) { + if enc > gorillaDirectEncoding { + panic("invalid Gorilla value encoding") + } + c[gorillaFlagOffset] &^= 0x03 // Clear. + c[gorillaFlagOffset] |= byte(enc) // Set. +} + +func (c gorillaChunk) closed() bool { + return c[gorillaFlagOffset] > 0x7F // Most significant bit set. +} + func (c gorillaChunk) zeroDDTRepeats() (repeats uint64, offset uint16) { offset = binary.BigEndian.Uint16(c[gorillaCountOffsetBitOffset:]) if offset == 0 { @@ -452,7 +481,9 @@ func (c *gorillaChunk) addSecondSample(s model.SamplePair) ([]chunk, error) { return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) } if firstTimeDelta > gorillaMaxTimeDelta { - return addToOverflowChunk(c, s) + // A time delta too great. Still, we can add it as a last sample + // before overflowing. + return c.addLastSample(s), nil } (*c)[gorillaFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16) (*c)[gorillaFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8) @@ -467,6 +498,18 @@ func (c *gorillaChunk) addSecondSample(s model.SamplePair) ([]chunk, error) { return []chunk{c}, nil } +// addLastSample isa a helper method only used by c.add() and in other helper +// methods called by c.add(). It simply sets the given sample as the last sample +// in the heador and declares the chunk closed. In other words, addLastSample +// adds the very last sample added to this chunk ever, while setLastSample sets +// the sample most recently added to the chunk so that it can be used for the +// calculations required to add the next sample. +func (c *gorillaChunk) addLastSample(s model.SamplePair) []chunk { + c.setLastSample(s) + (*c)[gorillaFlagOffset] |= 0x80 + return []chunk{c} +} + // addLaterSample is a helper method only used by c.add(). It adds a third or // later sample. func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) { @@ -475,23 +518,24 @@ func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chun lastTimeDelta = c.lastTimeDelta() newTimeDelta = s.Timestamp - lastTime lastValue = c.lastValue() - encoding = gorillaValueEncoding((*c)[gorillaFlagOffset]) + encoding = c.valueEncoding() ) if newTimeDelta < 0 { return nil, fmt.Errorf("Δt is less than zero: %v", newTimeDelta) } - if newTimeDelta > gorillaMaxTimeDelta { - return addToOverflowChunk(c, s) - } - if offset == gorillaThirdSampleBitOffset { offset, encoding = c.prepForThirdSample(lastValue, s.Value, encoding) } + if newTimeDelta > gorillaMaxTimeDelta { + // A time delta too great. Still, we can add it as a last sample + // before overflowing. + return c.addLastSample(s), nil + } - // Analyze worst case, does it fit? If not, overflow into new chunk. + // Analyze worst case, does it fit? If not, set new sample as the last. if int(offset)+gorillaWorstCaseBitsPerSample[encoding] > chunkLen*8 { - return addToOverflowChunk(c, s) + return c.addLastSample(s), nil } // Transcoding/overflow decisions first. @@ -518,7 +562,7 @@ func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chun offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta) if overflow { - return addToOverflowChunk(c, s) + return c.addLastSample(s), nil } switch encoding { case gorillaZeroEncoding: @@ -580,7 +624,7 @@ func (c gorillaChunk) prepForThirdSample( encoding = gorillaXOREncoding offset = c.addXORValue(offset, firstValue, lastValue) } - c[gorillaFlagOffset] = byte(encoding) + c.setValueEncoding(encoding) c.setNextSampleOffset(offset) return offset, encoding } @@ -806,7 +850,17 @@ func (c gorillaChunk) readBitPattern(offset, n uint16) uint64 { } type gorillaChunkIterator struct { - c gorillaChunk + c gorillaChunk + // pos is the bit position within the chunk for the next sample to be + // decoded when scan() is called (i.e. it is _not_ the bit position of + // the sample currently returned by value()). The symbolic values + // gorillaFirstSampleBitOffset and gorillaSecondSampleBitOffset are also + // used for pos. len is the offset of the first bit in the chunk that is + // not part of the payload. If pos==len, then the iterator is positioned + // behind the last sample in the payload. However, the next call of + // scan() still has to check if the chunk is closed, in which case there + // is one more sample, saved in the header. To mark the iterator as + // having scanned that last sample, too, pos is set to len+1. pos, len uint16 t, dT model.Time repeats byte // Repeats of ΔΔt=0. @@ -825,7 +879,7 @@ func newGorillaChunkIterator(c gorillaChunk) *gorillaChunkIterator { c: c, len: c.nextSampleOffset(), t: model.Earliest, - enc: gorillaValueEncoding(c[gorillaFlagOffset]), + enc: c.valueEncoding(), significant: 1, } } @@ -861,9 +915,18 @@ func (it *gorillaChunkIterator) scan() bool { it.rewound = false return true } - if it.pos >= it.len && it.repeats == 0 { + if it.pos > it.len { return false } + if it.pos == it.len && it.repeats == 0 { + it.pos = it.len + 1 + if !it.c.closed() { + return false + } + it.t = it.c.lastTime() + it.v = it.c.lastValue() + return it.lastError == nil + } if it.pos == gorillaFirstSampleBitOffset { it.t = it.c.firstTime() it.v = it.c.firstValue() @@ -871,11 +934,11 @@ func (it *gorillaChunkIterator) scan() bool { return it.lastError == nil } if it.pos == gorillaSecondSampleBitOffset { - if it.len == gorillaThirdSampleBitOffset { + if it.len == gorillaThirdSampleBitOffset && !it.c.closed() { // Special case: Chunk has only two samples. it.t = it.c.lastTime() it.v = it.c.lastValue() - it.pos = it.len + it.pos = it.len + 1 return it.lastError == nil } it.dT = it.c.firstTimeDelta() @@ -929,7 +992,7 @@ func (it *gorillaChunkIterator) findAtOrBefore(t model.Time) bool { if !t.Before(last) { it.t = last it.v = it.c.lastValue() - it.pos = it.len + it.pos = it.len + 1 return true } if t == it.t {