chunks: cleanup anything but xor encoding

xor encoding is fast enough for our purposes and provides
very good compression.
We remove all other ones that partially don't support floats
for the sake of simplicity.
This commit is contained in:
Fabian Reinartz 2016-11-29 22:02:58 +01:00
parent e67cf768dc
commit fa181a34c1
7 changed files with 160 additions and 653 deletions

View file

@ -145,7 +145,6 @@ func (b *bstream) readByte() (byte, error) {
} }
func (b *bstream) readBits(nbits int) (uint64, error) { func (b *bstream) readBits(nbits int) (uint64, error) {
var u uint64 var u uint64
for nbits >= 8 { for nbits >= 8 {

View file

@ -4,11 +4,6 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"io"
"math"
"sync/atomic"
"github.com/prometheus/common/model"
) )
// Encoding is the identifier for a chunk encoding // Encoding is the identifier for a chunk encoding
@ -18,12 +13,8 @@ func (e Encoding) String() string {
switch e { switch e {
case EncNone: case EncNone:
return "none" return "none"
case EncPlain:
return "plain"
case EncXOR: case EncXOR:
return "XOR" return "XOR"
case EncDoubleDelta:
return "doubleDelta"
} }
return "<unknown>" return "<unknown>"
} }
@ -31,9 +22,7 @@ func (e Encoding) String() string {
// The different available chunk encodings. // The different available chunk encodings.
const ( const (
EncNone Encoding = iota EncNone Encoding = iota
EncPlain
EncXOR EncXOR
EncDoubleDelta
) )
var ( var (
@ -44,160 +33,53 @@ var (
// Chunk holds a sequence of sample pairs that can be iterated over and appended to. // Chunk holds a sequence of sample pairs that can be iterated over and appended to.
type Chunk interface { type Chunk interface {
Data() []byte Bytes() []byte
Appender() Appender Appender() (Appender, error)
Iterator() Iterator Iterator() Iterator
} }
// FromData returns a chunk from a byte slice of chunk data. // FromBytes returns a chunk from a byte slice of chunk data.
func FromData(d []byte) (Chunk, error) { func FromBytes(d []byte) (Chunk, error) {
if len(d) == 0 { if len(d) < 1 {
return nil, fmt.Errorf("no data") return nil, fmt.Errorf("no data")
} }
e := Encoding(d[0]) e := Encoding(d[0])
switch e { switch e {
case EncPlain: case EncXOR:
rc := rawChunk{d: d, l: uint64(len(d))} return &XORChunk{
return &PlainChunk{rawChunk: rc}, nil b: &bstream{count: 8},
num: binary.LittleEndian.Uint16(d[1:3]),
}, nil
} }
return nil, fmt.Errorf("unknown chunk encoding: %d", e) return nil, fmt.Errorf("unknown chunk encoding: %d", e)
} }
// Iterator provides iterating access over sample pairs in a chunk. // Iterator provides iterating access over sample pairs in chunks.
type Iterator interface { type Iterator interface {
// Seek moves the iterator to the element at or after the given time StreamingIterator
// and returns the sample pair at the position.
Seek(model.Time) (model.SamplePair, bool)
// Next returns the next sample pair in the iterator.
Next() (model.SamplePair, bool)
// SeekBefore(model.Time) (model.SamplePair, bool) // Seek(t int64) bool
First() (model.SamplePair, bool) // SeekBefore(t int64) bool
// Last() (model.SamplePair, bool) // Next() bool
// Values() (int64, float64)
// Err returns a non-nil error if Next or Seek returned false. // Err() error
// Their behavior on subsequent calls after one of them returned false
// is undefined.
Err() error
} }
// Appender adds sample pairs to a chunk. // Appender adds sample pairs to a chunk.
type Appender interface { type Appender interface {
Append(model.Time, model.SampleValue) error Append(int64, float64) error
} }
// rawChunk provides a basic byte slice and is used by higher-level // StreamingIterator is a simple iterator that can only get the next value.
// Chunk implementations. It can be safely appended to without causing type StreamingIterator interface {
// any further allocations. Values() (int64, float64)
type rawChunk struct { Err() error
d []byte Next() bool
l uint64
} }
func newRawChunk(sz int, enc Encoding) rawChunk { // fancyIterator wraps a StreamingIterator and implements a regular
c := rawChunk{d: make([]byte, sz), l: 1} // Iterator with it.
c.d[0] = byte(enc) type fancyIterator struct {
return c StreamingIterator
}
func (c *rawChunk) encoding() Encoding {
return Encoding(c.d[0])
}
func (c *rawChunk) Data() []byte {
return c.d[:c.l]
}
func (c *rawChunk) append(b []byte) error {
if len(b) > len(c.d)-int(c.l) {
return ErrChunkFull
}
copy(c.d[c.l:], b)
// Atomically increment the length so we can safely retrieve iterators
// for a chunk that is being appended to.
// This does not make it safe for concurrent appends!
atomic.AddUint64(&c.l, uint64(len(b)))
return nil
}
// PlainChunk implements a Chunk using simple 16 byte representations
// of sample pairs.
type PlainChunk struct {
rawChunk
}
// NewPlainChunk returns a new chunk using EncPlain.
func NewPlainChunk(sz int) *PlainChunk {
return &PlainChunk{rawChunk: newRawChunk(sz, EncPlain)}
}
// Iterator implements the Chunk interface.
func (c *PlainChunk) Iterator() Iterator {
return &plainChunkIterator{c: c.d[1:c.l]}
}
// Appender implements the Chunk interface.
func (c *PlainChunk) Appender() Appender {
return &plainChunkAppender{c: &c.rawChunk}
}
type plainChunkAppender struct {
c *rawChunk
}
// Append implements the Appender interface,
func (a *plainChunkAppender) Append(ts model.Time, v model.SampleValue) error {
b := make([]byte, 16)
binary.LittleEndian.PutUint64(b, uint64(ts))
binary.LittleEndian.PutUint64(b[8:], math.Float64bits(float64(v)))
return a.c.append(b)
}
type plainChunkIterator struct {
c []byte // chunk data
pos int // position of last emitted element
err error // last error
}
func (it *plainChunkIterator) Err() error {
return it.err
}
func (it *plainChunkIterator) timeAt(pos int) model.Time {
return model.Time(binary.LittleEndian.Uint64(it.c[pos:]))
}
func (it *plainChunkIterator) valueAt(pos int) model.SampleValue {
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[pos:])))
}
func (it *plainChunkIterator) First() (model.SamplePair, bool) {
it.pos = 0
return it.Next()
}
func (it *plainChunkIterator) Seek(ts model.Time) (model.SamplePair, bool) {
for it.pos = 0; it.pos < len(it.c); it.pos += 16 {
if t := it.timeAt(it.pos); t >= ts {
return model.SamplePair{
Timestamp: t,
Value: it.valueAt(it.pos + 8),
}, true
}
}
it.err = io.EOF
return model.SamplePair{}, false
}
func (it *plainChunkIterator) Next() (model.SamplePair, bool) {
it.pos += 16
if it.pos >= len(it.c) {
it.err = io.EOF
return model.SamplePair{}, false
}
return model.SamplePair{
Timestamp: it.timeAt(it.pos),
Value: it.valueAt(it.pos + 8),
}, true
} }

View file

@ -1,99 +1,41 @@
package chunks package chunks
import ( import (
"encoding/binary"
"fmt" "fmt"
"io" "io"
"math"
"math/rand" "math/rand"
"testing" "testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestRawChunkAppend(t *testing.T) { type pair struct {
c := newRawChunk(1, 'a') t int64
require.NoError(t, c.append(nil)) v float64
require.Error(t, c.append([]byte("t")))
c = newRawChunk(5, 'a')
require.NoError(t, c.append([]byte("test")))
require.Error(t, c.append([]byte("x")))
require.Equal(t, rawChunk{d: []byte("atest"), l: 5}, c)
require.Equal(t, []byte("atest"), c.Data())
}
func TestPlainAppender(t *testing.T) {
c := NewPlainChunk(3*16 + 1)
a := c.Appender()
require.NoError(t, a.Append(1, 1))
require.NoError(t, a.Append(2, 2))
require.NoError(t, a.Append(3, 3))
require.Equal(t, ErrChunkFull, a.Append(4, 4))
exp := []byte{byte(EncPlain)}
b := make([]byte, 8)
for i := 1; i <= 3; i++ {
binary.LittleEndian.PutUint64(b, uint64(i))
exp = append(exp, b...)
binary.LittleEndian.PutUint64(b, math.Float64bits(float64(i)))
exp = append(exp, b...)
}
require.Equal(t, exp, c.Data())
}
func TestPlainIterator(t *testing.T) {
c := NewPlainChunk(100*16 + 1)
a := c.Appender()
var exp []model.SamplePair
for i := 0; i < 100; i++ {
exp = append(exp, model.SamplePair{
Timestamp: model.Time(i * 2),
Value: model.SampleValue(i * 2),
})
require.NoError(t, a.Append(model.Time(i*2), model.SampleValue(i*2)))
}
it := c.Iterator()
var res1 []model.SamplePair
for s, ok := it.Seek(0); ok; s, ok = it.Next() {
res1 = append(res1, s)
}
require.Equal(t, io.EOF, it.Err())
require.Equal(t, exp, res1)
var res2 []model.SamplePair
for s, ok := it.Seek(11); ok; s, ok = it.Next() {
res2 = append(res2, s)
}
require.Equal(t, io.EOF, it.Err())
require.Equal(t, exp[6:], res2)
} }
func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) {
var ( var (
baseT = model.Now() t = int64(1234123324)
baseV = 1243535 v = 1243535.123
) )
var exp []model.SamplePair var exp []pair
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
baseT += model.Time(rand.Intn(10000)) t += int64(rand.Intn(10000) + 1)
baseV += rand.Intn(10000) v = rand.Float64()
exp = append(exp, model.SamplePair{ exp = append(exp, pair{t: t, v: v})
Timestamp: baseT,
Value: model.SampleValue(baseV),
})
} }
var chunks []Chunk var chunks []Chunk
for i := 0; i < b.N; { for i := 0; i < b.N; {
c := newChunk(1024) c := newChunk(1024)
a := c.Appender()
for i < b.N { a, err := c.Appender()
if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull { if err != nil {
b.Fatalf("get appender: %s", err)
}
for _, p := range exp {
if err := a.Append(p.t, p.v); err == ErrChunkFull {
break break
} else if err != nil { } else if err != nil {
b.Fatal(err) b.Fatal(err)
@ -105,15 +47,18 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
fmt.Println("num", b.N)
res := make([]model.SamplePair, 0, 1024) fmt.Println("num", b.N, "created chunks", len(chunks))
res := make([]float64, 0, 1024)
for i := 0; i < len(chunks); i++ { for i := 0; i < len(chunks); i++ {
c := chunks[i] c := chunks[i]
it := c.Iterator() it := c.Iterator()
for s, ok := it.First(); ok; s, ok = it.Next() { for it.Next() {
res = append(res, s) _, v := it.Values()
res = append(res, v)
} }
if it.Err() != io.EOF { if it.Err() != io.EOF {
require.NoError(b, it.Err()) require.NoError(b, it.Err())
@ -122,18 +67,6 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) {
} }
} }
func BenchmarkPlainIterator(b *testing.B) {
benchmarkIterator(b, func(sz int) Chunk {
return NewPlainChunk(sz)
})
}
func BenchmarkDoubleDeltaIterator(b *testing.B) {
benchmarkIterator(b, func(sz int) Chunk {
return NewDoubleDeltaChunk(sz)
})
}
func BenchmarkXORIterator(b *testing.B) { func BenchmarkXORIterator(b *testing.B) {
benchmarkIterator(b, func(sz int) Chunk { benchmarkIterator(b, func(sz int) Chunk {
return NewXORChunk(sz) return NewXORChunk(sz)
@ -148,17 +81,14 @@ func BenchmarkXORAppender(b *testing.B) {
func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) {
var ( var (
baseT = model.Now() t = int64(1234123324)
baseV = 1243535 v = 1243535.123
) )
var exp []model.SamplePair var exp []pair
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
baseT += model.Time(rand.Intn(10000)) t += int64(rand.Intn(10000) + 1)
baseV += rand.Intn(10000) v = rand.Float64()
exp = append(exp, model.SamplePair{ exp = append(exp, pair{t: t, v: v})
Timestamp: baseT,
Value: model.SampleValue(baseV),
})
} }
b.ReportAllocs() b.ReportAllocs()
@ -167,9 +97,13 @@ func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) {
var chunks []Chunk var chunks []Chunk
for i := 0; i < b.N; { for i := 0; i < b.N; {
c := newChunk(1024) c := newChunk(1024)
a := c.Appender()
for i < b.N { a, err := c.Appender()
if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull { if err != nil {
b.Fatalf("get appender: %s", err)
}
for _, p := range exp {
if err := a.Append(p.t, p.v); err == ErrChunkFull {
break break
} else if err != nil { } else if err != nil {
b.Fatal(err) b.Fatal(err)
@ -178,17 +112,6 @@ func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) {
} }
chunks = append(chunks, c) chunks = append(chunks, c)
} }
fmt.Println("created chunks", len(chunks))
}
func BenchmarkPlainAppender(b *testing.B) { fmt.Println("num", b.N, "created chunks", len(chunks))
benchmarkAppender(b, func(sz int) Chunk {
return NewPlainChunk(sz)
})
}
func BenchmarkDoubleDeltaAppender(b *testing.B) {
benchmarkAppender(b, func(sz int) Chunk {
return NewDoubleDeltaChunk(sz)
})
} }

View file

@ -1,192 +0,0 @@
package chunks
import (
"encoding/binary"
"errors"
"io"
"math"
"github.com/prometheus/common/model"
)
// DoubleDeltaChunk stores delta-delta encoded sample data.
type DoubleDeltaChunk struct {
rawChunk
}
// NewDoubleDeltaChunk returns a new chunk using double delta encoding.
func NewDoubleDeltaChunk(sz int) *DoubleDeltaChunk {
return &DoubleDeltaChunk{rawChunk: newRawChunk(sz, EncDoubleDelta)}
}
// Iterator implements the Chunk interface.
func (c *DoubleDeltaChunk) Iterator() Iterator {
return &doubleDeltaIterator{d: c.d[1:c.l]}
}
// Appender implements the Chunk interface.
func (c *DoubleDeltaChunk) Appender() Appender {
return &doubleDeltaAppender{c: &c.rawChunk}
}
type doubleDeltaIterator struct {
d []byte
err error
pos, num int
curT, curV int64
nextT, nextV int64
deltaV int64
deltaT uint64
}
func (it *doubleDeltaIterator) Err() error {
return it.err
}
func (it *doubleDeltaIterator) readPair() bool {
if len(it.d) == it.pos {
return false
}
var (
n, m int
ddv int64
ddt uint64
)
it.curT = it.nextT
it.curV = it.nextV
if it.num > 1 {
ddt, n = binary.Uvarint(it.d[it.pos:])
ddv, m = binary.Varint(it.d[it.pos+n:])
it.deltaT += ddt
it.deltaV += ddv
it.nextT += int64(it.deltaT)
it.nextV += it.deltaV
} else if it.num == 1 {
it.deltaT, n = binary.Uvarint(it.d[it.pos:])
it.deltaV, m = binary.Varint(it.d[it.pos+n:])
it.nextT += int64(it.deltaT)
it.nextV += it.deltaV
} else {
it.nextT, n = binary.Varint(it.d[it.pos:])
it.nextV, m = binary.Varint(it.d[it.pos+n:])
}
it.pos += n + m
it.num++
return true
}
func (it *doubleDeltaIterator) First() (model.SamplePair, bool) {
it.pos = 0
it.num = 0
if !it.readPair() {
it.err = io.EOF
return model.SamplePair{}, false
}
return it.Next()
}
func (it *doubleDeltaIterator) Seek(ts model.Time) (model.SamplePair, bool) {
if int64(ts) < it.nextT {
it.pos = 0
it.num = 0
if !it.readPair() {
it.err = io.EOF
return model.SamplePair{}, false
}
if _, ok := it.Next(); !ok {
return model.SamplePair{}, false
}
}
for {
if it.nextT > int64(ts) {
if it.num < 2 {
it.err = io.EOF
return model.SamplePair{}, false
}
return model.SamplePair{
Timestamp: model.Time(it.curT),
Value: model.SampleValue(it.curV),
}, true
}
if _, ok := it.Next(); !ok {
return model.SamplePair{}, false
}
}
}
func (it *doubleDeltaIterator) Next() (model.SamplePair, bool) {
if it.err == io.EOF {
return model.SamplePair{}, false
}
res := model.SamplePair{
Timestamp: model.Time(it.nextT),
Value: model.SampleValue(it.nextV),
}
if !it.readPair() {
it.err = io.EOF
}
return res, true
}
type doubleDeltaAppender struct {
c *rawChunk
buf [16]byte
num int // stored values so far.
lastV, lastVDelta int64
lastT int64
lastTDelta uint64
}
func isInt(f model.SampleValue) (int64, bool) {
x, frac := math.Modf(float64(f))
if frac != 0 {
return 0, false
}
return int64(x), true
}
// ErrNoInteger is returned if a non-integer is appended to
// a double delta chunk.
var ErrNoInteger = errors.New("not an integer")
func (a *doubleDeltaAppender) Append(ts model.Time, fv model.SampleValue) error {
v, ok := isInt(fv)
if !ok {
return ErrNoInteger
}
if a.num == 0 {
n := binary.PutVarint(a.buf[:], int64(ts))
n += binary.PutVarint(a.buf[n:], v)
if err := a.c.append(a.buf[:n]); err != nil {
return err
}
a.lastT, a.lastV = int64(ts), v
a.num++
return nil
}
if a.num == 1 {
a.lastTDelta, a.lastVDelta = uint64(int64(ts)-a.lastT), v-a.lastV
n := binary.PutUvarint(a.buf[:], a.lastTDelta)
n += binary.PutVarint(a.buf[n:], a.lastVDelta)
if err := a.c.append(a.buf[:n]); err != nil {
return err
}
} else {
predT, predV := a.lastT+int64(a.lastTDelta), a.lastV+a.lastVDelta
tdd := uint64(int64(ts) - predT)
vdd := v - predV
n := binary.PutUvarint(a.buf[:], tdd)
n += binary.PutVarint(a.buf[n:], vdd)
if err := a.c.append(a.buf[:n]); err != nil {
return err
}
a.lastTDelta += tdd
a.lastVDelta += vdd
}
a.lastT, a.lastV = int64(ts), v
a.num++
return nil
}

View file

@ -1,58 +0,0 @@
package chunks
import (
"io"
"math/rand"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func testDoubleDeltaChunk(t *testing.T) {
ts := model.Time(14345645)
v := int64(123123)
var input []model.SamplePair
for i := 0; i < 2000; i++ {
ts += model.Time(rand.Int63n(100) + 1)
v += rand.Int63n(1000)
if rand.Int() > 0 {
v *= -1
}
input = append(input, model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(v),
})
}
c := NewDoubleDeltaChunk(rand.Intn(3000))
app := c.Appender()
for i, s := range input {
err := app.Append(s.Timestamp, s.Value)
if err == ErrChunkFull {
input = input[:i]
break
}
require.NoError(t, err, "at sample %d: %v", i, s)
}
result := []model.SamplePair{}
it := c.Iterator()
for s, ok := it.First(); ok; s, ok = it.Next() {
result = append(result, s)
}
if it.Err() != io.EOF {
require.NoError(t, it.Err())
}
require.Equal(t, input, result)
}
func TestDoubleDeltaChunk(t *testing.T) {
for i := 0; i < 10000; i++ {
testDoubleDeltaChunk(t)
}
}

View file

@ -1,46 +1,82 @@
package chunks package chunks
import ( import (
"encoding/binary"
"math" "math"
bits "github.com/dgryski/go-bits" bits "github.com/dgryski/go-bits"
"github.com/prometheus/common/model"
) )
// XORChunk holds XOR encoded sample data. // XORChunk holds XOR encoded sample data.
type XORChunk struct { type XORChunk struct {
bstream b *bstream
num uint16 num uint16
sz int sz int
lastLen int
lastCount uint8
} }
// NewXORChunk returns a new chunk with XOR encoding of the given size. // NewXORChunk returns a new chunk with XOR encoding of the given size.
func NewXORChunk(sz int) *XORChunk { func NewXORChunk(size int) *XORChunk {
return &XORChunk{sz: sz} b := make([]byte, 3, 64)
b[0] = byte(EncXOR)
return &XORChunk{
b: &bstream{stream: b, count: 0},
sz: size,
num: 0,
}
} }
func (c *XORChunk) Data() []byte { // Bytes returns the underlying byte slice of the chunk.
return nil func (c *XORChunk) Bytes() []byte {
b := c.b.bytes()
// Lazily populate length bytes probably not necessary to have the
// cache value in struct.
binary.LittleEndian.PutUint16(b[1:3], c.num)
return b
} }
// Appender implements the Chunk interface. // Appender implements the Chunk interface.
func (c *XORChunk) Appender() Appender { func (c *XORChunk) Appender() (Appender, error) {
return &xorAppender{c: c} it := c.iterator()
// To get an appender we must know the state it would have if we had
// appended all existing data from scratch.
// We iterate through the end and populate via the iterator's state.
for it.Next() {
}
if err := it.Err(); err != nil {
return nil, err
}
return &xorAppender{
c: c,
b: c.b,
t: it.t,
v: it.val,
tDelta: it.tDelta,
leading: it.leading,
trailing: it.trailing,
}, nil
}
func (c *XORChunk) iterator() *xorIterator {
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
// When using striped locks to guard access to chunks, probably yes.
// Could only copy data if the chunk is not completed yet.
return &xorIterator{
br: newBReader(c.b.bytes()[3:]),
numTotal: c.num,
}
} }
// Iterator implements the Chunk interface. // Iterator implements the Chunk interface.
func (c *XORChunk) Iterator() Iterator { func (c *XORChunk) Iterator() Iterator {
br := c.bstream.clone() return fancyIterator{c.iterator()}
br.count = 8
return &xorIterator{br: br, numTotal: c.num}
} }
type xorAppender struct { type xorAppender struct {
c *XORChunk c *XORChunk
b *bstream
t int64 t int64
v float64 v float64
@ -48,26 +84,20 @@ type xorAppender struct {
leading uint8 leading uint8
trailing uint8 trailing uint8
finished bool
} }
func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error { func (a *xorAppender) Append(t int64, v float64) error {
// TODO(fabxc): remove Prometheus types from interface.
return a.append(int64(ts), float64(v))
}
func (a *xorAppender) append(t int64, v float64) error {
var tDelta uint64 var tDelta uint64
if a.c.num == 0 { if a.c.num == 0 {
// TODO: store varint time? // TODO: store varint time?
a.c.writeBits(uint64(t), 64) a.b.writeBits(uint64(t), 64)
a.c.writeBits(math.Float64bits(v), 64) a.b.writeBits(math.Float64bits(v), 64)
} else if a.c.num == 1 { } else if a.c.num == 1 {
tDelta = uint64(t - a.t) tDelta = uint64(t - a.t)
// TODO: use varint or other encoding for first delta? // TODO: use varint or other encoding for first delta?
a.c.writeBits(tDelta, 64) a.b.writeBits(tDelta, 64)
a.writeVDelta(v) a.writeVDelta(v)
} else { } else {
@ -78,25 +108,25 @@ func (a *xorAppender) append(t int64, v float64) error {
// Thus we use higher value range steps with larger bit size. // Thus we use higher value range steps with larger bit size.
switch { switch {
case dod == 0: case dod == 0:
a.c.writeBit(zero) a.b.writeBit(zero)
case -8191 <= dod && dod <= 8192: case -8191 <= dod && dod <= 8192:
a.c.writeBits(0x02, 2) // '10' a.b.writeBits(0x02, 2) // '10'
a.c.writeBits(uint64(dod), 14) a.b.writeBits(uint64(dod), 14)
case -65535 <= dod && dod <= 65536: case -65535 <= dod && dod <= 65536:
a.c.writeBits(0x06, 3) // '110' a.b.writeBits(0x06, 3) // '110'
a.c.writeBits(uint64(dod), 17) a.b.writeBits(uint64(dod), 17)
case -524287 <= dod && dod <= 524288: case -524287 <= dod && dod <= 524288:
a.c.writeBits(0x0e, 4) // '1110' a.b.writeBits(0x0e, 4) // '1110'
a.c.writeBits(uint64(dod), 20) a.b.writeBits(uint64(dod), 20)
default: default:
a.c.writeBits(0x0f, 4) // '1111' a.b.writeBits(0x0f, 4) // '1111'
a.c.writeBits(uint64(dod), 64) a.b.writeBits(uint64(dod), 64)
} }
a.writeVDelta(v) a.writeVDelta(v)
} }
if len(a.c.stream) > a.c.sz { if len(a.b.bytes()) > a.c.sz {
return ErrChunkFull return ErrChunkFull
} }
@ -104,8 +134,7 @@ func (a *xorAppender) append(t int64, v float64) error {
a.v = v a.v = v
a.c.num++ a.c.num++
a.tDelta = tDelta a.tDelta = tDelta
a.c.lastCount = a.c.count
a.c.lastLen = len(a.c.stream)
return nil return nil
} }
@ -113,10 +142,10 @@ func (a *xorAppender) writeVDelta(v float64) {
vDelta := math.Float64bits(v) ^ math.Float64bits(a.v) vDelta := math.Float64bits(v) ^ math.Float64bits(a.v)
if vDelta == 0 { if vDelta == 0 {
a.c.writeBit(zero) a.b.writeBit(zero)
return return
} }
a.c.writeBit(one) a.b.writeBit(one)
leading := uint8(bits.Clz(vDelta)) leading := uint8(bits.Clz(vDelta))
trailing := uint8(bits.Ctz(vDelta)) trailing := uint8(bits.Ctz(vDelta))
@ -128,20 +157,20 @@ func (a *xorAppender) writeVDelta(v float64) {
// TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead // TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead
if a.leading != ^uint8(0) && leading >= a.leading && trailing >= a.trailing { if a.leading != ^uint8(0) && leading >= a.leading && trailing >= a.trailing {
a.c.writeBit(zero) a.b.writeBit(zero)
a.c.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing))
} else { } else {
a.leading, a.trailing = leading, trailing a.leading, a.trailing = leading, trailing
a.c.writeBit(one) a.b.writeBit(one)
a.c.writeBits(uint64(leading), 5) a.b.writeBits(uint64(leading), 5)
// Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have. // Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have.
// Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0). // Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0).
// So instead we write out a 0 and adjust it back to 64 on unpacking. // So instead we write out a 0 and adjust it back to 64 on unpacking.
sigbits := 64 - leading - trailing sigbits := 64 - leading - trailing
a.c.writeBits(uint64(sigbits), 6) a.b.writeBits(uint64(sigbits), 6)
a.c.writeBits(vDelta>>trailing, int(sigbits)) a.b.writeBits(vDelta>>trailing, int(sigbits))
} }
} }
@ -156,7 +185,7 @@ type xorIterator struct {
leading uint8 leading uint8
trailing uint8 trailing uint8
tDelta int64 tDelta uint64
err error err error
} }
@ -164,16 +193,15 @@ func (it *xorIterator) Values() (int64, float64) {
return it.t, it.val return it.t, it.val
} }
func (it *xorIterator) NextB() bool { func (it *xorIterator) Err() error {
return it.err
}
func (it *xorIterator) Next() bool {
if it.err != nil || it.numRead == it.numTotal { if it.err != nil || it.numRead == it.numTotal {
return false return false
} }
var d byte
var dod int32
var sz uint
var tDelta int64
if it.numRead == 0 { if it.numRead == 0 {
t, err := it.br.readBits(64) t, err := it.br.readBits(64)
if err != nil { if err != nil {
@ -197,12 +225,13 @@ func (it *xorIterator) NextB() bool {
it.err = err it.err = err
return false return false
} }
it.tDelta = int64(tDelta) it.tDelta = tDelta
it.t = it.t + it.tDelta it.t = it.t + int64(it.tDelta)
goto ReadValue return it.readValue()
} }
var d byte
// read delta-of-delta // read delta-of-delta
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
d <<= 1 d <<= 1
@ -216,7 +245,8 @@ func (it *xorIterator) NextB() bool {
} }
d |= 1 d |= 1
} }
var sz uint8
var dod int64
switch d { switch d {
case 0x00: case 0x00:
// dod == 0 // dod == 0
@ -233,7 +263,7 @@ func (it *xorIterator) NextB() bool {
return false return false
} }
dod = int32(bits) dod = int64(bits)
} }
if sz != 0 { if sz != 0 {
@ -246,16 +276,16 @@ func (it *xorIterator) NextB() bool {
// or something // or something
bits = bits - (1 << sz) bits = bits - (1 << sz)
} }
dod = int32(bits) dod = int64(bits)
} }
tDelta = it.tDelta + int64(dod) it.tDelta = uint64(int64(it.tDelta) + dod)
it.t = it.t + int64(it.tDelta)
it.tDelta = tDelta return it.readValue()
it.t = it.t + it.tDelta }
ReadValue: func (it *xorIterator) readValue() bool {
// read compressed value
bit, err := it.br.readBit() bit, err := it.br.readBit()
if err != nil { if err != nil {
it.err = err it.err = err
@ -265,8 +295,8 @@ ReadValue:
if bit == zero { if bit == zero {
// it.val = it.val // it.val = it.val
} else { } else {
bit, itErr := it.br.readBit() bit, err := it.br.readBit()
if itErr != nil { if err != nil {
it.err = err it.err = err
return false return false
} }
@ -308,19 +338,3 @@ ReadValue:
it.numRead++ it.numRead++
return true return true
} }
func (it *xorIterator) First() (model.SamplePair, bool) {
return model.SamplePair{}, false
}
func (it *xorIterator) Seek(ts model.Time) (model.SamplePair, bool) {
return model.SamplePair{}, false
}
func (it *xorIterator) Next() (model.SamplePair, bool) {
return model.SamplePair{}, false
}
func (it *xorIterator) Err() error {
return it.err
}

View file

@ -1,61 +0,0 @@
package chunks
import (
"math/rand"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func testXORChunk(t *testing.T) {
ts := model.Time(124213233)
v := int64(99954541)
var input []model.SamplePair
for i := 0; i < 10000; i++ {
ts += model.Time(rand.Int63n(50000) + 1)
v += rand.Int63n(1000)
if rand.Int() > 0 {
v *= -1
}
input = append(input, model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(v),
})
}
c := NewXORChunk(rand.Intn(3000))
app := c.Appender()
for i, s := range input {
err := app.Append(s.Timestamp, s.Value)
if err == ErrChunkFull {
input = input[:i]
break
}
require.NoError(t, err, "at sample %d: %v", i, s)
}
result := []model.SamplePair{}
it := c.Iterator().(*xorIterator)
for {
ok := it.NextB()
if !ok {
break
}
t, v := it.Values()
result = append(result, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)})
}
require.NoError(t, it.Err())
require.Equal(t, input, result)
}
func TestXORChunk(t *testing.T) {
for i := 0; i < 10; i++ {
testXORChunk(t)
}
}