diff --git a/chunks/chunk.go b/chunks/chunk.go index 2a453d81b2..bcc66c1ff7 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -121,6 +121,198 @@ func (c *rawChunk) append(b []byte) error { return nil } +type bitChunk struct { + d []byte + + sz int + pos uint32 // bytes used in the chunk + count uint32 // valid bits in last byte + + // Read copies of above values used when retrieving iterators. + rl uint32 + rcount uint32 +} + +type bit bool + +const ( + zero bit = false + one bit = true +) + +func newBitChunk(sz int, enc Encoding) bitChunk { + c := bitChunk{d: make([]byte, sz+1), pos: 1, count: 8} + c.d[0] = byte(enc) + return c +} + +func (c *bitChunk) encoding() Encoding { + return Encoding(c.d[0]) +} + +func (c *bitChunk) Data() []byte { + return c.d[:c.pos] +} + +func (c *bitChunk) reader() *bitChunkReader { + fmt.Println(len(c.d), c.pos) + return &bitChunkReader{d: c.d[1 : c.pos+1], count: 8} +} + +type bitChunkReader struct { + d []byte + count uint8 + l uint32 +} + +func (r *bitChunkReader) readBit() (bit, error) { + if len(r.d) == 0 { + return false, io.EOF + } + + if r.count == 0 { + r.d = r.d[1:] + // did we just run out of stuff to read? + if len(r.d) == 0 { + return false, io.EOF + } + r.count = 8 + } + + r.count-- + d := r.d[0] & 0x80 + r.d[0] <<= 1 + return d != 0, nil +} + +func (r *bitChunkReader) readByte() (byte, error) { + if len(r.d) == 0 { + return 0, io.EOF + } + + if r.count == 0 { + r.d = r.d[1:] + + if len(r.d) == 0 { + return 0, io.EOF + } + + r.count = 8 + } + + if r.count == 8 { + r.count = 0 + return r.d[0], nil + } + + byt := r.d[0] + r.d = r.d[1:] + + if len(r.d) == 0 { + return 0, io.EOF + } + + byt |= r.d[0] >> r.count + r.d[0] <<= (8 - r.count) + + return byt, nil +} + +func (r *bitChunkReader) readBits(nbits int) (uint64, error) { + var u uint64 + + for nbits >= 8 { + byt, err := r.readByte() + if err != nil { + return 0, err + } + + u = (u << 8) | uint64(byt) + nbits -= 8 + } + + if nbits == 0 { + return u, nil + } + + if nbits > int(r.count) { + u = (u << uint(r.count)) | uint64(r.d[0]>>(8-r.count)) + nbits -= int(r.count) + r.d = r.d[1:] + + if len(r.d) == 0 { + return 0, io.EOF + } + r.count = 8 + } + + u = (u << uint(nbits)) | uint64(r.d[0]>>(8-uint(nbits))) + r.d[0] <<= uint(nbits) + r.count -= uint8(nbits) + return u, nil +} + +// append appends the first nbits bits from b into the chunk. +// b must contain at least nbits bits. +// We are using fixed 16 bytes as it might perform better due to +// more static assumptions. +func (c *bitChunk) append(b [20]byte, nbits int) error { + if nbits > 8*(len(c.d)-int(c.pos)-1)-int(c.count) { + return ErrChunkFull + } + + c.writeBits(b, nbits) + // Swap the working length and count integers into the ones used + // to retrieve iterators. This allows to concurrently retrieve + // iteartors while appending to a chunk. + // This does not make it safe for concurrent appends! + atomic.StoreUint32(&c.rl, c.pos) + atomic.StoreUint32(&c.rcount, c.count) + return nil +} + +func (c *bitChunk) writeBit(bit bit) { + if c.count == 0 { + c.pos++ + c.count = 8 + } + + if bit { + c.d[c.pos] |= 1 << (c.count - 1) + } + + c.count-- +} + +func (c *bitChunk) writeByte(byt byte) { + if c.count == 0 { + c.pos++ + c.count = 8 + } + + // fill up b.b with b.count bits from byt + c.d[c.pos] |= byt >> (8 - c.count) + + c.pos++ + c.d[c.pos] = byt << c.count +} + +func (c *bitChunk) writeBits(b [20]byte, nbits int) { + i := 0 + for nbits >= 8 { + c.writeByte(b[i]) + i++ + nbits -= 8 + } + + bi := b[i] + for nbits > 0 { + c.writeBit((bi >> 7) == 1) + bi <<= 1 + nbits-- + } +} + // PlainChunk implements a Chunk using simple 16 byte representations // of sample pairs. type PlainChunk struct { diff --git a/chunks/chunk_test.go b/chunks/chunk_test.go index b351390de4..0e7bb8abd6 100644 --- a/chunks/chunk_test.go +++ b/chunks/chunk_test.go @@ -105,6 +105,7 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { b.ReportAllocs() b.ResetTimer() + fmt.Println("num", b.N) res := make([]model.SamplePair, 0, 1024) for i := 0; i < len(chunks); i++ { @@ -115,7 +116,7 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { res = append(res, s) } if it.Err() != io.EOF { - b.Fatal(it.Err()) + require.NoError(b, it.Err()) } res = res[:0] } @@ -133,6 +134,18 @@ func BenchmarkDoubleDeltaIterator(b *testing.B) { }) } +func BenchmarkXORIterator(b *testing.B) { + benchmarkIterator(b, func(sz int) Chunk { + return NewXORChunk(sz) + }) +} + +func BenchmarkXORAppender(b *testing.B) { + benchmarkAppender(b, func(sz int) Chunk { + return NewXORChunk(sz) + }) +} + func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { var ( baseT = model.Now() diff --git a/chunks/xor.go b/chunks/xor.go index 13f7b7ca47..d190aa9c4b 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -1,63 +1,347 @@ package chunks import ( - "encoding/binary" "math" + bits "github.com/dgryski/go-bits" "github.com/prometheus/common/model" ) // XORChunk holds XOR encoded sample data. type XORChunk struct { - rawChunk + num uint16 + bitChunk } // NewXORChunk returns a new chunk with XOR encoding of the given size. func NewXORChunk(sz int) *XORChunk { - return &XORChunk{rawChunk: newRawChunk(sz, EncXOR)} + return &XORChunk{bitChunk: newBitChunk(sz, EncXOR)} } // Appender implements the Chunk interface. func (c *XORChunk) Appender() Appender { - return &xorAppender{c: &c.rawChunk} + return &xorAppender{c: c, pos: 1} } // Iterator implements the Chunk interface. func (c *XORChunk) Iterator() Iterator { - return &xorIterator{d: c.d[1:c.l]} + return &xorIterator{br: c.bitChunk.reader(), numTotal: c.num} } type xorAppender struct { - c *rawChunk - num int - buf [16]byte + c *XORChunk - lastV float64 - lastT int64 - lastTDelta uint64 + t int64 + v float64 + buf [20]byte // bits written for current sample. 17 to avoid if condition in hot path. + pos uint8 // num of bytes in buf + count uint8 // number of bits in last buf byte + + leading uint8 + trailing uint8 + finished bool + + tDelta uint64 } func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error { - if a.num == 0 { - n := binary.PutVarint(a.buf[:], int64(ts)) - binary.BigEndian.PutUint64(a.buf[n:], math.Float64bits(float64(v))) - if err := a.c.append(a.buf[:n+8]); err != nil { - return err + // TODO(fabxc): remove Prometheus types from interface. + return a.append(int64(ts), float64(v)) +} + +func (a *xorAppender) append(t int64, v float64) error { + // Reset bit buffer. + a.buf = [20]byte{} + a.count = 8 + a.pos = 0 + + if a.c.num > 1 { + tDelta := uint64(t - a.t) + dod := int64(tDelta - a.tDelta) + + // Gorilla has a max resolution of seconds, Prometheus milliseconds. + // Thus we use higher value range steps with larger bit size. + switch { + case dod == 0: + a.writeBit(zero) + case -8191 <= dod && dod <= 8192: + a.writeBits(0x02, 2) // '10' + a.writeBits(uint64(dod), 14) + case -65535 <= dod && dod <= 65536: + a.writeBits(0x06, 3) // '110' + a.writeBits(uint64(dod), 17) + case -524287 <= dod && dod <= 524288: + a.writeBits(0x0e, 4) // '1110' + a.writeBits(uint64(dod), 20) + default: + a.writeBits(0x0f, 4) // '1111' + a.writeBits(uint64(dod), 64) } - a.lastT, a.lastV = int64(ts), float64(v) - a.num++ - return nil - } - if a.num == 1 { - a.lastTDelta = uint64(int64(ts) - a.lastT) + a.tDelta = tDelta + + a.writeVDelta(v) + + } else if a.c.num == 0 { + // TODO: store varint time? + a.writeBits(uint64(t), 64) + a.writeBits(math.Float64bits(v), 64) + } else { + a.tDelta = uint64(t - a.t) + // TODO: use varint or other encoding for first delta? + a.writeBits(uint64(a.tDelta), 64) + a.writeVDelta(v) } - a.num++ + if err := a.c.append(a.buf, int(a.pos+1)*8-int(a.count)); err != nil { + return err + } + a.t = t + a.v = v + a.c.num++ + // TODO: also preserve tDelta – even though it doesn't really matter at this point. return nil } +func (a *xorAppender) writeVDelta(v float64) { + vDelta := math.Float64bits(v) ^ math.Float64bits(a.v) + + if vDelta == 0 { + a.writeBit(zero) + return + } + a.writeBit(one) + + leading := uint8(bits.Clz(vDelta)) + trailing := uint8(bits.Ctz(vDelta)) + + // clamp number of leading zeros to avoid overflow when encoding + if leading >= 32 { + leading = 31 + } + + // TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead + if a.leading != ^uint8(0) && leading >= a.leading && trailing >= a.trailing { + a.writeBit(zero) + a.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) + } else { + a.leading, a.trailing = leading, trailing + + a.writeBit(one) + a.writeBits(uint64(leading), 5) + + // Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have. + // Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0). + // So instead we write out a 0 and adjust it back to 64 on unpacking. + sigbits := 64 - leading - trailing + a.writeBits(uint64(sigbits), 6) + a.writeBits(vDelta>>trailing, int(sigbits)) + } +} + +func (a *xorAppender) writeBits(u uint64, nbits int) { + u <<= (64 - uint(nbits)) + for nbits >= 8 { + byt := byte(u >> 56) + a.writeByte(byt) + u <<= 8 + nbits -= 8 + } + + for nbits > 0 { + a.writeBit((u >> 63) == 1) + u <<= 1 + nbits-- + } +} + +func (a *xorAppender) writeBit(bit bit) { + if a.count == 0 { + a.pos++ + a.count = 8 + } + + if bit { + a.buf[a.pos] |= 1 << (a.count - 1) + } + + a.count-- +} + +func (a *xorAppender) writeByte(byt byte) { + if a.count == 0 { + a.pos++ + a.count = 8 + } + + // fill up b.b with b.count bits from byt + a.buf[a.pos] |= byt >> (8 - a.count) + + a.pos++ + a.buf[a.pos] = byt << a.count +} + type xorIterator struct { - d []byte + br *bitChunkReader + numTotal uint16 + numRead uint16 + + t int64 + val float64 + + leading uint8 + trailing uint8 + + tDelta int64 + err error +} + +func (it *xorIterator) Values() (int64, float64) { + return it.t, it.val +} + +func (it *xorIterator) NextB() bool { + if it.err != nil || it.numRead == it.numTotal { + return false + } + + var d byte + var dod int32 + var sz uint + var tDelta int64 + + if it.numRead == 0 { + t, err := it.br.readBits(64) + if err != nil { + it.err = err + return false + } + v, err := it.br.readBits(64) + if err != nil { + it.err = err + return false + } + it.t = int64(t) + it.val = math.Float64frombits(v) + + it.numRead++ + return true + } + if it.numRead == 1 { + tDelta, err := it.br.readBits(64) + if err != nil { + it.err = err + return false + } + it.tDelta = int64(tDelta) + it.t = it.t + it.tDelta + + goto ReadValue + } + + // read delta-of-delta + for i := 0; i < 4; i++ { + d <<= 1 + bit, err := it.br.readBit() + if err != nil { + it.err = err + return false + } + if bit == zero { + break + } + d |= 1 + } + + switch d { + case 0x00: + // dod == 0 + case 0x02: + sz = 14 + case 0x06: + sz = 17 + case 0x0e: + sz = 20 + case 0x0f: + bits, err := it.br.readBits(64) + if err != nil { + it.err = err + return false + } + + dod = int32(bits) + } + + if sz != 0 { + bits, err := it.br.readBits(int(sz)) + if err != nil { + it.err = err + return false + } + if bits > (1 << (sz - 1)) { + // or something + bits = bits - (1 << sz) + } + dod = int32(bits) + } + + tDelta = it.tDelta + int64(dod) + + it.tDelta = tDelta + it.t = it.t + it.tDelta + +ReadValue: + // read compressed value + bit, err := it.br.readBit() + if err != nil { + it.err = err + return false + } + + if bit == zero { + // it.val = it.val + } else { + bit, itErr := it.br.readBit() + if itErr != nil { + it.err = err + return false + } + if bit == zero { + // reuse leading/trailing zero bits + // it.leading, it.trailing = it.leading, it.trailing + } else { + bits, err := it.br.readBits(5) + if err != nil { + it.err = err + return false + } + it.leading = uint8(bits) + + bits, err = it.br.readBits(6) + if err != nil { + it.err = err + return false + } + mbits := uint8(bits) + // 0 significant bits here means we overflowed and we actually need 64; see comment in encoder + if mbits == 0 { + mbits = 64 + } + it.trailing = 64 - it.leading - mbits + } + + mbits := int(64 - it.leading - it.trailing) + bits, err := it.br.readBits(mbits) + if err != nil { + it.err = err + return false + } + vbits := math.Float64bits(it.val) + vbits ^= (bits << it.trailing) + it.val = math.Float64frombits(vbits) + } + + it.numRead++ + return true } func (it *xorIterator) First() (model.SamplePair, bool) { @@ -73,5 +357,5 @@ func (it *xorIterator) Next() (model.SamplePair, bool) { } func (it *xorIterator) Err() error { - return nil + return it.err } diff --git a/chunks/xor_test.go b/chunks/xor_test.go new file mode 100644 index 0000000000..cb5c232826 --- /dev/null +++ b/chunks/xor_test.go @@ -0,0 +1,61 @@ +package chunks + +import ( + "math/rand" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func testXORChunk(t *testing.T) { + ts := model.Time(124213233) + v := int64(99954541) + + var input []model.SamplePair + for i := 0; i < 10000; i++ { + ts += model.Time(rand.Int63n(50000) + 1) + v += rand.Int63n(1000) + if rand.Int() > 0 { + v *= -1 + } + + input = append(input, model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(v), + }) + } + + c := NewXORChunk(rand.Intn(3000)) + + app := c.Appender() + for i, s := range input { + err := app.Append(s.Timestamp, s.Value) + if err == ErrChunkFull { + input = input[:i] + break + } + require.NoError(t, err, "at sample %d: %v", i, s) + } + + result := []model.SamplePair{} + + it := c.Iterator().(*xorIterator) + for { + ok := it.NextB() + if !ok { + break + } + t, v := it.Values() + result = append(result, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)}) + } + + require.NoError(t, it.Err()) + require.Equal(t, input, result) +} + +func TestXORChunk(t *testing.T) { + for i := 0; i < 1000000; i++ { + testXORChunk(t) + } +}