mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
chunks: implement xor encoding
This commit is contained in:
parent
342aa82505
commit
7874d28f32
192
chunks/chunk.go
192
chunks/chunk.go
|
@ -121,6 +121,198 @@ func (c *rawChunk) append(b []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type bitChunk struct {
|
||||
d []byte
|
||||
|
||||
sz int
|
||||
pos uint32 // bytes used in the chunk
|
||||
count uint32 // valid bits in last byte
|
||||
|
||||
// Read copies of above values used when retrieving iterators.
|
||||
rl uint32
|
||||
rcount uint32
|
||||
}
|
||||
|
||||
type bit bool
|
||||
|
||||
const (
|
||||
zero bit = false
|
||||
one bit = true
|
||||
)
|
||||
|
||||
func newBitChunk(sz int, enc Encoding) bitChunk {
|
||||
c := bitChunk{d: make([]byte, sz+1), pos: 1, count: 8}
|
||||
c.d[0] = byte(enc)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *bitChunk) encoding() Encoding {
|
||||
return Encoding(c.d[0])
|
||||
}
|
||||
|
||||
func (c *bitChunk) Data() []byte {
|
||||
return c.d[:c.pos]
|
||||
}
|
||||
|
||||
func (c *bitChunk) reader() *bitChunkReader {
|
||||
fmt.Println(len(c.d), c.pos)
|
||||
return &bitChunkReader{d: c.d[1 : c.pos+1], count: 8}
|
||||
}
|
||||
|
||||
type bitChunkReader struct {
|
||||
d []byte
|
||||
count uint8
|
||||
l uint32
|
||||
}
|
||||
|
||||
func (r *bitChunkReader) readBit() (bit, error) {
|
||||
if len(r.d) == 0 {
|
||||
return false, io.EOF
|
||||
}
|
||||
|
||||
if r.count == 0 {
|
||||
r.d = r.d[1:]
|
||||
// did we just run out of stuff to read?
|
||||
if len(r.d) == 0 {
|
||||
return false, io.EOF
|
||||
}
|
||||
r.count = 8
|
||||
}
|
||||
|
||||
r.count--
|
||||
d := r.d[0] & 0x80
|
||||
r.d[0] <<= 1
|
||||
return d != 0, nil
|
||||
}
|
||||
|
||||
func (r *bitChunkReader) readByte() (byte, error) {
|
||||
if len(r.d) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if r.count == 0 {
|
||||
r.d = r.d[1:]
|
||||
|
||||
if len(r.d) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
r.count = 8
|
||||
}
|
||||
|
||||
if r.count == 8 {
|
||||
r.count = 0
|
||||
return r.d[0], nil
|
||||
}
|
||||
|
||||
byt := r.d[0]
|
||||
r.d = r.d[1:]
|
||||
|
||||
if len(r.d) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
byt |= r.d[0] >> r.count
|
||||
r.d[0] <<= (8 - r.count)
|
||||
|
||||
return byt, nil
|
||||
}
|
||||
|
||||
func (r *bitChunkReader) readBits(nbits int) (uint64, error) {
|
||||
var u uint64
|
||||
|
||||
for nbits >= 8 {
|
||||
byt, err := r.readByte()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
u = (u << 8) | uint64(byt)
|
||||
nbits -= 8
|
||||
}
|
||||
|
||||
if nbits == 0 {
|
||||
return u, nil
|
||||
}
|
||||
|
||||
if nbits > int(r.count) {
|
||||
u = (u << uint(r.count)) | uint64(r.d[0]>>(8-r.count))
|
||||
nbits -= int(r.count)
|
||||
r.d = r.d[1:]
|
||||
|
||||
if len(r.d) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
r.count = 8
|
||||
}
|
||||
|
||||
u = (u << uint(nbits)) | uint64(r.d[0]>>(8-uint(nbits)))
|
||||
r.d[0] <<= uint(nbits)
|
||||
r.count -= uint8(nbits)
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// append appends the first nbits bits from b into the chunk.
|
||||
// b must contain at least nbits bits.
|
||||
// We are using fixed 16 bytes as it might perform better due to
|
||||
// more static assumptions.
|
||||
func (c *bitChunk) append(b [20]byte, nbits int) error {
|
||||
if nbits > 8*(len(c.d)-int(c.pos)-1)-int(c.count) {
|
||||
return ErrChunkFull
|
||||
}
|
||||
|
||||
c.writeBits(b, nbits)
|
||||
// Swap the working length and count integers into the ones used
|
||||
// to retrieve iterators. This allows to concurrently retrieve
|
||||
// iteartors while appending to a chunk.
|
||||
// This does not make it safe for concurrent appends!
|
||||
atomic.StoreUint32(&c.rl, c.pos)
|
||||
atomic.StoreUint32(&c.rcount, c.count)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *bitChunk) writeBit(bit bit) {
|
||||
if c.count == 0 {
|
||||
c.pos++
|
||||
c.count = 8
|
||||
}
|
||||
|
||||
if bit {
|
||||
c.d[c.pos] |= 1 << (c.count - 1)
|
||||
}
|
||||
|
||||
c.count--
|
||||
}
|
||||
|
||||
func (c *bitChunk) writeByte(byt byte) {
|
||||
if c.count == 0 {
|
||||
c.pos++
|
||||
c.count = 8
|
||||
}
|
||||
|
||||
// fill up b.b with b.count bits from byt
|
||||
c.d[c.pos] |= byt >> (8 - c.count)
|
||||
|
||||
c.pos++
|
||||
c.d[c.pos] = byt << c.count
|
||||
}
|
||||
|
||||
func (c *bitChunk) writeBits(b [20]byte, nbits int) {
|
||||
i := 0
|
||||
for nbits >= 8 {
|
||||
c.writeByte(b[i])
|
||||
i++
|
||||
nbits -= 8
|
||||
}
|
||||
|
||||
bi := b[i]
|
||||
for nbits > 0 {
|
||||
c.writeBit((bi >> 7) == 1)
|
||||
bi <<= 1
|
||||
nbits--
|
||||
}
|
||||
}
|
||||
|
||||
// PlainChunk implements a Chunk using simple 16 byte representations
|
||||
// of sample pairs.
|
||||
type PlainChunk struct {
|
||||
|
|
|
@ -105,6 +105,7 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) {
|
|||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
fmt.Println("num", b.N)
|
||||
|
||||
res := make([]model.SamplePair, 0, 1024)
|
||||
for i := 0; i < len(chunks); i++ {
|
||||
|
@ -115,7 +116,7 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) {
|
|||
res = append(res, s)
|
||||
}
|
||||
if it.Err() != io.EOF {
|
||||
b.Fatal(it.Err())
|
||||
require.NoError(b, it.Err())
|
||||
}
|
||||
res = res[:0]
|
||||
}
|
||||
|
@ -133,6 +134,18 @@ func BenchmarkDoubleDeltaIterator(b *testing.B) {
|
|||
})
|
||||
}
|
||||
|
||||
func BenchmarkXORIterator(b *testing.B) {
|
||||
benchmarkIterator(b, func(sz int) Chunk {
|
||||
return NewXORChunk(sz)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkXORAppender(b *testing.B) {
|
||||
benchmarkAppender(b, func(sz int) Chunk {
|
||||
return NewXORChunk(sz)
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) {
|
||||
var (
|
||||
baseT = model.Now()
|
||||
|
|
334
chunks/xor.go
334
chunks/xor.go
|
@ -1,63 +1,347 @@
|
|||
package chunks
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
|
||||
bits "github.com/dgryski/go-bits"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
// XORChunk holds XOR encoded sample data.
|
||||
type XORChunk struct {
|
||||
rawChunk
|
||||
num uint16
|
||||
bitChunk
|
||||
}
|
||||
|
||||
// NewXORChunk returns a new chunk with XOR encoding of the given size.
|
||||
func NewXORChunk(sz int) *XORChunk {
|
||||
return &XORChunk{rawChunk: newRawChunk(sz, EncXOR)}
|
||||
return &XORChunk{bitChunk: newBitChunk(sz, EncXOR)}
|
||||
}
|
||||
|
||||
// Appender implements the Chunk interface.
|
||||
func (c *XORChunk) Appender() Appender {
|
||||
return &xorAppender{c: &c.rawChunk}
|
||||
return &xorAppender{c: c, pos: 1}
|
||||
}
|
||||
|
||||
// Iterator implements the Chunk interface.
|
||||
func (c *XORChunk) Iterator() Iterator {
|
||||
return &xorIterator{d: c.d[1:c.l]}
|
||||
return &xorIterator{br: c.bitChunk.reader(), numTotal: c.num}
|
||||
}
|
||||
|
||||
type xorAppender struct {
|
||||
c *rawChunk
|
||||
num int
|
||||
buf [16]byte
|
||||
c *XORChunk
|
||||
|
||||
lastV float64
|
||||
lastT int64
|
||||
lastTDelta uint64
|
||||
t int64
|
||||
v float64
|
||||
buf [20]byte // bits written for current sample. 17 to avoid if condition in hot path.
|
||||
pos uint8 // num of bytes in buf
|
||||
count uint8 // number of bits in last buf byte
|
||||
|
||||
leading uint8
|
||||
trailing uint8
|
||||
finished bool
|
||||
|
||||
tDelta uint64
|
||||
}
|
||||
|
||||
func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error {
|
||||
if a.num == 0 {
|
||||
n := binary.PutVarint(a.buf[:], int64(ts))
|
||||
binary.BigEndian.PutUint64(a.buf[n:], math.Float64bits(float64(v)))
|
||||
if err := a.c.append(a.buf[:n+8]); err != nil {
|
||||
return err
|
||||
// TODO(fabxc): remove Prometheus types from interface.
|
||||
return a.append(int64(ts), float64(v))
|
||||
}
|
||||
|
||||
func (a *xorAppender) append(t int64, v float64) error {
|
||||
// Reset bit buffer.
|
||||
a.buf = [20]byte{}
|
||||
a.count = 8
|
||||
a.pos = 0
|
||||
|
||||
if a.c.num > 1 {
|
||||
tDelta := uint64(t - a.t)
|
||||
dod := int64(tDelta - a.tDelta)
|
||||
|
||||
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
|
||||
// Thus we use higher value range steps with larger bit size.
|
||||
switch {
|
||||
case dod == 0:
|
||||
a.writeBit(zero)
|
||||
case -8191 <= dod && dod <= 8192:
|
||||
a.writeBits(0x02, 2) // '10'
|
||||
a.writeBits(uint64(dod), 14)
|
||||
case -65535 <= dod && dod <= 65536:
|
||||
a.writeBits(0x06, 3) // '110'
|
||||
a.writeBits(uint64(dod), 17)
|
||||
case -524287 <= dod && dod <= 524288:
|
||||
a.writeBits(0x0e, 4) // '1110'
|
||||
a.writeBits(uint64(dod), 20)
|
||||
default:
|
||||
a.writeBits(0x0f, 4) // '1111'
|
||||
a.writeBits(uint64(dod), 64)
|
||||
}
|
||||
a.lastT, a.lastV = int64(ts), float64(v)
|
||||
a.num++
|
||||
return nil
|
||||
}
|
||||
if a.num == 1 {
|
||||
a.lastTDelta = uint64(int64(ts) - a.lastT)
|
||||
a.tDelta = tDelta
|
||||
|
||||
a.writeVDelta(v)
|
||||
|
||||
} else if a.c.num == 0 {
|
||||
// TODO: store varint time?
|
||||
a.writeBits(uint64(t), 64)
|
||||
a.writeBits(math.Float64bits(v), 64)
|
||||
} else {
|
||||
a.tDelta = uint64(t - a.t)
|
||||
// TODO: use varint or other encoding for first delta?
|
||||
a.writeBits(uint64(a.tDelta), 64)
|
||||
a.writeVDelta(v)
|
||||
}
|
||||
|
||||
a.num++
|
||||
if err := a.c.append(a.buf, int(a.pos+1)*8-int(a.count)); err != nil {
|
||||
return err
|
||||
}
|
||||
a.t = t
|
||||
a.v = v
|
||||
a.c.num++
|
||||
// TODO: also preserve tDelta – even though it doesn't really matter at this point.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *xorAppender) writeVDelta(v float64) {
|
||||
vDelta := math.Float64bits(v) ^ math.Float64bits(a.v)
|
||||
|
||||
if vDelta == 0 {
|
||||
a.writeBit(zero)
|
||||
return
|
||||
}
|
||||
a.writeBit(one)
|
||||
|
||||
leading := uint8(bits.Clz(vDelta))
|
||||
trailing := uint8(bits.Ctz(vDelta))
|
||||
|
||||
// clamp number of leading zeros to avoid overflow when encoding
|
||||
if leading >= 32 {
|
||||
leading = 31
|
||||
}
|
||||
|
||||
// TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead
|
||||
if a.leading != ^uint8(0) && leading >= a.leading && trailing >= a.trailing {
|
||||
a.writeBit(zero)
|
||||
a.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing))
|
||||
} else {
|
||||
a.leading, a.trailing = leading, trailing
|
||||
|
||||
a.writeBit(one)
|
||||
a.writeBits(uint64(leading), 5)
|
||||
|
||||
// Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have.
|
||||
// Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0).
|
||||
// So instead we write out a 0 and adjust it back to 64 on unpacking.
|
||||
sigbits := 64 - leading - trailing
|
||||
a.writeBits(uint64(sigbits), 6)
|
||||
a.writeBits(vDelta>>trailing, int(sigbits))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *xorAppender) writeBits(u uint64, nbits int) {
|
||||
u <<= (64 - uint(nbits))
|
||||
for nbits >= 8 {
|
||||
byt := byte(u >> 56)
|
||||
a.writeByte(byt)
|
||||
u <<= 8
|
||||
nbits -= 8
|
||||
}
|
||||
|
||||
for nbits > 0 {
|
||||
a.writeBit((u >> 63) == 1)
|
||||
u <<= 1
|
||||
nbits--
|
||||
}
|
||||
}
|
||||
|
||||
func (a *xorAppender) writeBit(bit bit) {
|
||||
if a.count == 0 {
|
||||
a.pos++
|
||||
a.count = 8
|
||||
}
|
||||
|
||||
if bit {
|
||||
a.buf[a.pos] |= 1 << (a.count - 1)
|
||||
}
|
||||
|
||||
a.count--
|
||||
}
|
||||
|
||||
func (a *xorAppender) writeByte(byt byte) {
|
||||
if a.count == 0 {
|
||||
a.pos++
|
||||
a.count = 8
|
||||
}
|
||||
|
||||
// fill up b.b with b.count bits from byt
|
||||
a.buf[a.pos] |= byt >> (8 - a.count)
|
||||
|
||||
a.pos++
|
||||
a.buf[a.pos] = byt << a.count
|
||||
}
|
||||
|
||||
type xorIterator struct {
|
||||
d []byte
|
||||
br *bitChunkReader
|
||||
numTotal uint16
|
||||
numRead uint16
|
||||
|
||||
t int64
|
||||
val float64
|
||||
|
||||
leading uint8
|
||||
trailing uint8
|
||||
|
||||
tDelta int64
|
||||
err error
|
||||
}
|
||||
|
||||
func (it *xorIterator) Values() (int64, float64) {
|
||||
return it.t, it.val
|
||||
}
|
||||
|
||||
func (it *xorIterator) NextB() bool {
|
||||
if it.err != nil || it.numRead == it.numTotal {
|
||||
return false
|
||||
}
|
||||
|
||||
var d byte
|
||||
var dod int32
|
||||
var sz uint
|
||||
var tDelta int64
|
||||
|
||||
if it.numRead == 0 {
|
||||
t, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
v, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
it.t = int64(t)
|
||||
it.val = math.Float64frombits(v)
|
||||
|
||||
it.numRead++
|
||||
return true
|
||||
}
|
||||
if it.numRead == 1 {
|
||||
tDelta, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
it.tDelta = int64(tDelta)
|
||||
it.t = it.t + it.tDelta
|
||||
|
||||
goto ReadValue
|
||||
}
|
||||
|
||||
// read delta-of-delta
|
||||
for i := 0; i < 4; i++ {
|
||||
d <<= 1
|
||||
bit, err := it.br.readBit()
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
if bit == zero {
|
||||
break
|
||||
}
|
||||
d |= 1
|
||||
}
|
||||
|
||||
switch d {
|
||||
case 0x00:
|
||||
// dod == 0
|
||||
case 0x02:
|
||||
sz = 14
|
||||
case 0x06:
|
||||
sz = 17
|
||||
case 0x0e:
|
||||
sz = 20
|
||||
case 0x0f:
|
||||
bits, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
|
||||
dod = int32(bits)
|
||||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := it.br.readBits(int(sz))
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
if bits > (1 << (sz - 1)) {
|
||||
// or something
|
||||
bits = bits - (1 << sz)
|
||||
}
|
||||
dod = int32(bits)
|
||||
}
|
||||
|
||||
tDelta = it.tDelta + int64(dod)
|
||||
|
||||
it.tDelta = tDelta
|
||||
it.t = it.t + it.tDelta
|
||||
|
||||
ReadValue:
|
||||
// read compressed value
|
||||
bit, err := it.br.readBit()
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
|
||||
if bit == zero {
|
||||
// it.val = it.val
|
||||
} else {
|
||||
bit, itErr := it.br.readBit()
|
||||
if itErr != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
if bit == zero {
|
||||
// reuse leading/trailing zero bits
|
||||
// it.leading, it.trailing = it.leading, it.trailing
|
||||
} else {
|
||||
bits, err := it.br.readBits(5)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
it.leading = uint8(bits)
|
||||
|
||||
bits, err = it.br.readBits(6)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
mbits := uint8(bits)
|
||||
// 0 significant bits here means we overflowed and we actually need 64; see comment in encoder
|
||||
if mbits == 0 {
|
||||
mbits = 64
|
||||
}
|
||||
it.trailing = 64 - it.leading - mbits
|
||||
}
|
||||
|
||||
mbits := int(64 - it.leading - it.trailing)
|
||||
bits, err := it.br.readBits(mbits)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
vbits := math.Float64bits(it.val)
|
||||
vbits ^= (bits << it.trailing)
|
||||
it.val = math.Float64frombits(vbits)
|
||||
}
|
||||
|
||||
it.numRead++
|
||||
return true
|
||||
}
|
||||
|
||||
func (it *xorIterator) First() (model.SamplePair, bool) {
|
||||
|
@ -73,5 +357,5 @@ func (it *xorIterator) Next() (model.SamplePair, bool) {
|
|||
}
|
||||
|
||||
func (it *xorIterator) Err() error {
|
||||
return nil
|
||||
return it.err
|
||||
}
|
||||
|
|
61
chunks/xor_test.go
Normal file
61
chunks/xor_test.go
Normal file
|
@ -0,0 +1,61 @@
|
|||
package chunks
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func testXORChunk(t *testing.T) {
|
||||
ts := model.Time(124213233)
|
||||
v := int64(99954541)
|
||||
|
||||
var input []model.SamplePair
|
||||
for i := 0; i < 10000; i++ {
|
||||
ts += model.Time(rand.Int63n(50000) + 1)
|
||||
v += rand.Int63n(1000)
|
||||
if rand.Int() > 0 {
|
||||
v *= -1
|
||||
}
|
||||
|
||||
input = append(input, model.SamplePair{
|
||||
Timestamp: ts,
|
||||
Value: model.SampleValue(v),
|
||||
})
|
||||
}
|
||||
|
||||
c := NewXORChunk(rand.Intn(3000))
|
||||
|
||||
app := c.Appender()
|
||||
for i, s := range input {
|
||||
err := app.Append(s.Timestamp, s.Value)
|
||||
if err == ErrChunkFull {
|
||||
input = input[:i]
|
||||
break
|
||||
}
|
||||
require.NoError(t, err, "at sample %d: %v", i, s)
|
||||
}
|
||||
|
||||
result := []model.SamplePair{}
|
||||
|
||||
it := c.Iterator().(*xorIterator)
|
||||
for {
|
||||
ok := it.NextB()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
t, v := it.Values()
|
||||
result = append(result, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)})
|
||||
}
|
||||
|
||||
require.NoError(t, it.Err())
|
||||
require.Equal(t, input, result)
|
||||
}
|
||||
|
||||
func TestXORChunk(t *testing.T) {
|
||||
for i := 0; i < 1000000; i++ {
|
||||
testXORChunk(t)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue