diff --git a/chunks/bstream.go b/chunks/bstream.go index c595e01921..48cc6cbfe6 100644 --- a/chunks/bstream.go +++ b/chunks/bstream.go @@ -145,7 +145,6 @@ func (b *bstream) readByte() (byte, error) { } func (b *bstream) readBits(nbits int) (uint64, error) { - var u uint64 for nbits >= 8 { diff --git a/chunks/chunk.go b/chunks/chunk.go index 2a453d81b2..c0594a5b57 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -4,11 +4,6 @@ import ( "encoding/binary" "errors" "fmt" - "io" - "math" - "sync/atomic" - - "github.com/prometheus/common/model" ) // Encoding is the identifier for a chunk encoding @@ -18,12 +13,8 @@ func (e Encoding) String() string { switch e { case EncNone: return "none" - case EncPlain: - return "plain" case EncXOR: return "XOR" - case EncDoubleDelta: - return "doubleDelta" } return "" } @@ -31,9 +22,7 @@ func (e Encoding) String() string { // The different available chunk encodings. const ( EncNone Encoding = iota - EncPlain EncXOR - EncDoubleDelta ) var ( @@ -44,160 +33,53 @@ var ( // Chunk holds a sequence of sample pairs that can be iterated over and appended to. type Chunk interface { - Data() []byte - Appender() Appender + Bytes() []byte + Appender() (Appender, error) Iterator() Iterator } -// FromData returns a chunk from a byte slice of chunk data. -func FromData(d []byte) (Chunk, error) { - if len(d) == 0 { +// FromBytes returns a chunk from a byte slice of chunk data. +func FromBytes(d []byte) (Chunk, error) { + if len(d) < 1 { return nil, fmt.Errorf("no data") } e := Encoding(d[0]) switch e { - case EncPlain: - rc := rawChunk{d: d, l: uint64(len(d))} - return &PlainChunk{rawChunk: rc}, nil + case EncXOR: + return &XORChunk{ + b: &bstream{count: 8}, + num: binary.LittleEndian.Uint16(d[1:3]), + }, nil } return nil, fmt.Errorf("unknown chunk encoding: %d", e) } -// Iterator provides iterating access over sample pairs in a chunk. +// Iterator provides iterating access over sample pairs in chunks. type Iterator interface { - // Seek moves the iterator to the element at or after the given time - // and returns the sample pair at the position. - Seek(model.Time) (model.SamplePair, bool) - // Next returns the next sample pair in the iterator. - Next() (model.SamplePair, bool) + StreamingIterator - // SeekBefore(model.Time) (model.SamplePair, bool) - First() (model.SamplePair, bool) - // Last() (model.SamplePair, bool) - - // Err returns a non-nil error if Next or Seek returned false. - // Their behavior on subsequent calls after one of them returned false - // is undefined. - Err() error + // Seek(t int64) bool + // SeekBefore(t int64) bool + // Next() bool + // Values() (int64, float64) + // Err() error } // Appender adds sample pairs to a chunk. type Appender interface { - Append(model.Time, model.SampleValue) error + Append(int64, float64) error } -// rawChunk provides a basic byte slice and is used by higher-level -// Chunk implementations. It can be safely appended to without causing -// any further allocations. -type rawChunk struct { - d []byte - l uint64 +// StreamingIterator is a simple iterator that can only get the next value. +type StreamingIterator interface { + Values() (int64, float64) + Err() error + Next() bool } -func newRawChunk(sz int, enc Encoding) rawChunk { - c := rawChunk{d: make([]byte, sz), l: 1} - c.d[0] = byte(enc) - return c -} - -func (c *rawChunk) encoding() Encoding { - return Encoding(c.d[0]) -} - -func (c *rawChunk) Data() []byte { - return c.d[:c.l] -} - -func (c *rawChunk) append(b []byte) error { - if len(b) > len(c.d)-int(c.l) { - return ErrChunkFull - } - copy(c.d[c.l:], b) - // Atomically increment the length so we can safely retrieve iterators - // for a chunk that is being appended to. - // This does not make it safe for concurrent appends! - atomic.AddUint64(&c.l, uint64(len(b))) - return nil -} - -// PlainChunk implements a Chunk using simple 16 byte representations -// of sample pairs. -type PlainChunk struct { - rawChunk -} - -// NewPlainChunk returns a new chunk using EncPlain. -func NewPlainChunk(sz int) *PlainChunk { - return &PlainChunk{rawChunk: newRawChunk(sz, EncPlain)} -} - -// Iterator implements the Chunk interface. -func (c *PlainChunk) Iterator() Iterator { - return &plainChunkIterator{c: c.d[1:c.l]} -} - -// Appender implements the Chunk interface. -func (c *PlainChunk) Appender() Appender { - return &plainChunkAppender{c: &c.rawChunk} -} - -type plainChunkAppender struct { - c *rawChunk -} - -// Append implements the Appender interface, -func (a *plainChunkAppender) Append(ts model.Time, v model.SampleValue) error { - b := make([]byte, 16) - binary.LittleEndian.PutUint64(b, uint64(ts)) - binary.LittleEndian.PutUint64(b[8:], math.Float64bits(float64(v))) - return a.c.append(b) -} - -type plainChunkIterator struct { - c []byte // chunk data - pos int // position of last emitted element - err error // last error -} - -func (it *plainChunkIterator) Err() error { - return it.err -} - -func (it *plainChunkIterator) timeAt(pos int) model.Time { - return model.Time(binary.LittleEndian.Uint64(it.c[pos:])) -} - -func (it *plainChunkIterator) valueAt(pos int) model.SampleValue { - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[pos:]))) -} - -func (it *plainChunkIterator) First() (model.SamplePair, bool) { - it.pos = 0 - return it.Next() -} - -func (it *plainChunkIterator) Seek(ts model.Time) (model.SamplePair, bool) { - for it.pos = 0; it.pos < len(it.c); it.pos += 16 { - if t := it.timeAt(it.pos); t >= ts { - return model.SamplePair{ - Timestamp: t, - Value: it.valueAt(it.pos + 8), - }, true - } - } - it.err = io.EOF - return model.SamplePair{}, false -} - -func (it *plainChunkIterator) Next() (model.SamplePair, bool) { - it.pos += 16 - if it.pos >= len(it.c) { - it.err = io.EOF - return model.SamplePair{}, false - } - return model.SamplePair{ - Timestamp: it.timeAt(it.pos), - Value: it.valueAt(it.pos + 8), - }, true +// fancyIterator wraps a StreamingIterator and implements a regular +// Iterator with it. +type fancyIterator struct { + StreamingIterator } diff --git a/chunks/chunk_test.go b/chunks/chunk_test.go index 0e7bb8abd6..c89ffac1f8 100644 --- a/chunks/chunk_test.go +++ b/chunks/chunk_test.go @@ -1,99 +1,41 @@ package chunks import ( - "encoding/binary" "fmt" "io" - "math" "math/rand" "testing" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) -func TestRawChunkAppend(t *testing.T) { - c := newRawChunk(1, 'a') - require.NoError(t, c.append(nil)) - require.Error(t, c.append([]byte("t"))) - - c = newRawChunk(5, 'a') - require.NoError(t, c.append([]byte("test"))) - require.Error(t, c.append([]byte("x"))) - require.Equal(t, rawChunk{d: []byte("atest"), l: 5}, c) - require.Equal(t, []byte("atest"), c.Data()) -} - -func TestPlainAppender(t *testing.T) { - c := NewPlainChunk(3*16 + 1) - a := c.Appender() - - require.NoError(t, a.Append(1, 1)) - require.NoError(t, a.Append(2, 2)) - require.NoError(t, a.Append(3, 3)) - require.Equal(t, ErrChunkFull, a.Append(4, 4)) - - exp := []byte{byte(EncPlain)} - b := make([]byte, 8) - for i := 1; i <= 3; i++ { - binary.LittleEndian.PutUint64(b, uint64(i)) - exp = append(exp, b...) - binary.LittleEndian.PutUint64(b, math.Float64bits(float64(i))) - exp = append(exp, b...) - } - require.Equal(t, exp, c.Data()) -} - -func TestPlainIterator(t *testing.T) { - c := NewPlainChunk(100*16 + 1) - a := c.Appender() - - var exp []model.SamplePair - for i := 0; i < 100; i++ { - exp = append(exp, model.SamplePair{ - Timestamp: model.Time(i * 2), - Value: model.SampleValue(i * 2), - }) - require.NoError(t, a.Append(model.Time(i*2), model.SampleValue(i*2))) - } - - it := c.Iterator() - - var res1 []model.SamplePair - for s, ok := it.Seek(0); ok; s, ok = it.Next() { - res1 = append(res1, s) - } - require.Equal(t, io.EOF, it.Err()) - require.Equal(t, exp, res1) - - var res2 []model.SamplePair - for s, ok := it.Seek(11); ok; s, ok = it.Next() { - res2 = append(res2, s) - } - require.Equal(t, io.EOF, it.Err()) - require.Equal(t, exp[6:], res2) +type pair struct { + t int64 + v float64 } func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { var ( - baseT = model.Now() - baseV = 1243535 + t = int64(1234123324) + v = 1243535.123 ) - var exp []model.SamplePair + var exp []pair for i := 0; i < b.N; i++ { - baseT += model.Time(rand.Intn(10000)) - baseV += rand.Intn(10000) - exp = append(exp, model.SamplePair{ - Timestamp: baseT, - Value: model.SampleValue(baseV), - }) + t += int64(rand.Intn(10000) + 1) + v = rand.Float64() + exp = append(exp, pair{t: t, v: v}) } + var chunks []Chunk for i := 0; i < b.N; { c := newChunk(1024) - a := c.Appender() - for i < b.N { - if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull { + + a, err := c.Appender() + if err != nil { + b.Fatalf("get appender: %s", err) + } + for _, p := range exp { + if err := a.Append(p.t, p.v); err == ErrChunkFull { break } else if err != nil { b.Fatal(err) @@ -105,15 +47,18 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { b.ReportAllocs() b.ResetTimer() - fmt.Println("num", b.N) - res := make([]model.SamplePair, 0, 1024) + fmt.Println("num", b.N, "created chunks", len(chunks)) + + res := make([]float64, 0, 1024) + for i := 0; i < len(chunks); i++ { c := chunks[i] it := c.Iterator() - for s, ok := it.First(); ok; s, ok = it.Next() { - res = append(res, s) + for it.Next() { + _, v := it.Values() + res = append(res, v) } if it.Err() != io.EOF { require.NoError(b, it.Err()) @@ -122,18 +67,6 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { } } -func BenchmarkPlainIterator(b *testing.B) { - benchmarkIterator(b, func(sz int) Chunk { - return NewPlainChunk(sz) - }) -} - -func BenchmarkDoubleDeltaIterator(b *testing.B) { - benchmarkIterator(b, func(sz int) Chunk { - return NewDoubleDeltaChunk(sz) - }) -} - func BenchmarkXORIterator(b *testing.B) { benchmarkIterator(b, func(sz int) Chunk { return NewXORChunk(sz) @@ -148,17 +81,14 @@ func BenchmarkXORAppender(b *testing.B) { func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { var ( - baseT = model.Now() - baseV = 1243535 + t = int64(1234123324) + v = 1243535.123 ) - var exp []model.SamplePair + var exp []pair for i := 0; i < b.N; i++ { - baseT += model.Time(rand.Intn(10000)) - baseV += rand.Intn(10000) - exp = append(exp, model.SamplePair{ - Timestamp: baseT, - Value: model.SampleValue(baseV), - }) + t += int64(rand.Intn(10000) + 1) + v = rand.Float64() + exp = append(exp, pair{t: t, v: v}) } b.ReportAllocs() @@ -167,9 +97,13 @@ func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { var chunks []Chunk for i := 0; i < b.N; { c := newChunk(1024) - a := c.Appender() - for i < b.N { - if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull { + + a, err := c.Appender() + if err != nil { + b.Fatalf("get appender: %s", err) + } + for _, p := range exp { + if err := a.Append(p.t, p.v); err == ErrChunkFull { break } else if err != nil { b.Fatal(err) @@ -178,17 +112,6 @@ func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { } chunks = append(chunks, c) } - fmt.Println("created chunks", len(chunks)) -} -func BenchmarkPlainAppender(b *testing.B) { - benchmarkAppender(b, func(sz int) Chunk { - return NewPlainChunk(sz) - }) -} - -func BenchmarkDoubleDeltaAppender(b *testing.B) { - benchmarkAppender(b, func(sz int) Chunk { - return NewDoubleDeltaChunk(sz) - }) + fmt.Println("num", b.N, "created chunks", len(chunks)) } diff --git a/chunks/doubledelta.go b/chunks/doubledelta.go deleted file mode 100644 index 0ac4655e39..0000000000 --- a/chunks/doubledelta.go +++ /dev/null @@ -1,192 +0,0 @@ -package chunks - -import ( - "encoding/binary" - "errors" - "io" - "math" - - "github.com/prometheus/common/model" -) - -// DoubleDeltaChunk stores delta-delta encoded sample data. -type DoubleDeltaChunk struct { - rawChunk -} - -// NewDoubleDeltaChunk returns a new chunk using double delta encoding. -func NewDoubleDeltaChunk(sz int) *DoubleDeltaChunk { - return &DoubleDeltaChunk{rawChunk: newRawChunk(sz, EncDoubleDelta)} -} - -// Iterator implements the Chunk interface. -func (c *DoubleDeltaChunk) Iterator() Iterator { - return &doubleDeltaIterator{d: c.d[1:c.l]} -} - -// Appender implements the Chunk interface. -func (c *DoubleDeltaChunk) Appender() Appender { - return &doubleDeltaAppender{c: &c.rawChunk} -} - -type doubleDeltaIterator struct { - d []byte - - err error - pos, num int - curT, curV int64 - nextT, nextV int64 - deltaV int64 - deltaT uint64 -} - -func (it *doubleDeltaIterator) Err() error { - return it.err -} - -func (it *doubleDeltaIterator) readPair() bool { - if len(it.d) == it.pos { - return false - } - var ( - n, m int - ddv int64 - ddt uint64 - ) - it.curT = it.nextT - it.curV = it.nextV - - if it.num > 1 { - ddt, n = binary.Uvarint(it.d[it.pos:]) - ddv, m = binary.Varint(it.d[it.pos+n:]) - it.deltaT += ddt - it.deltaV += ddv - it.nextT += int64(it.deltaT) - it.nextV += it.deltaV - } else if it.num == 1 { - it.deltaT, n = binary.Uvarint(it.d[it.pos:]) - it.deltaV, m = binary.Varint(it.d[it.pos+n:]) - it.nextT += int64(it.deltaT) - it.nextV += it.deltaV - } else { - it.nextT, n = binary.Varint(it.d[it.pos:]) - it.nextV, m = binary.Varint(it.d[it.pos+n:]) - } - it.pos += n + m - it.num++ - return true -} - -func (it *doubleDeltaIterator) First() (model.SamplePair, bool) { - it.pos = 0 - it.num = 0 - if !it.readPair() { - it.err = io.EOF - return model.SamplePair{}, false - } - return it.Next() -} - -func (it *doubleDeltaIterator) Seek(ts model.Time) (model.SamplePair, bool) { - if int64(ts) < it.nextT { - it.pos = 0 - it.num = 0 - if !it.readPair() { - it.err = io.EOF - return model.SamplePair{}, false - } - if _, ok := it.Next(); !ok { - return model.SamplePair{}, false - } - } - for { - if it.nextT > int64(ts) { - if it.num < 2 { - it.err = io.EOF - return model.SamplePair{}, false - } - return model.SamplePair{ - Timestamp: model.Time(it.curT), - Value: model.SampleValue(it.curV), - }, true - } - if _, ok := it.Next(); !ok { - return model.SamplePair{}, false - } - } -} - -func (it *doubleDeltaIterator) Next() (model.SamplePair, bool) { - if it.err == io.EOF { - return model.SamplePair{}, false - } - res := model.SamplePair{ - Timestamp: model.Time(it.nextT), - Value: model.SampleValue(it.nextV), - } - if !it.readPair() { - it.err = io.EOF - } - return res, true -} - -type doubleDeltaAppender struct { - c *rawChunk - buf [16]byte - num int // stored values so far. - - lastV, lastVDelta int64 - lastT int64 - lastTDelta uint64 -} - -func isInt(f model.SampleValue) (int64, bool) { - x, frac := math.Modf(float64(f)) - if frac != 0 { - return 0, false - } - return int64(x), true -} - -// ErrNoInteger is returned if a non-integer is appended to -// a double delta chunk. -var ErrNoInteger = errors.New("not an integer") - -func (a *doubleDeltaAppender) Append(ts model.Time, fv model.SampleValue) error { - v, ok := isInt(fv) - if !ok { - return ErrNoInteger - } - if a.num == 0 { - n := binary.PutVarint(a.buf[:], int64(ts)) - n += binary.PutVarint(a.buf[n:], v) - if err := a.c.append(a.buf[:n]); err != nil { - return err - } - a.lastT, a.lastV = int64(ts), v - a.num++ - return nil - } - if a.num == 1 { - a.lastTDelta, a.lastVDelta = uint64(int64(ts)-a.lastT), v-a.lastV - n := binary.PutUvarint(a.buf[:], a.lastTDelta) - n += binary.PutVarint(a.buf[n:], a.lastVDelta) - if err := a.c.append(a.buf[:n]); err != nil { - return err - } - } else { - predT, predV := a.lastT+int64(a.lastTDelta), a.lastV+a.lastVDelta - tdd := uint64(int64(ts) - predT) - vdd := v - predV - n := binary.PutUvarint(a.buf[:], tdd) - n += binary.PutVarint(a.buf[n:], vdd) - if err := a.c.append(a.buf[:n]); err != nil { - return err - } - a.lastTDelta += tdd - a.lastVDelta += vdd - } - a.lastT, a.lastV = int64(ts), v - a.num++ - return nil -} diff --git a/chunks/doubledelta_test.go b/chunks/doubledelta_test.go deleted file mode 100644 index d62725b972..0000000000 --- a/chunks/doubledelta_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package chunks - -import ( - "io" - "math/rand" - "testing" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" -) - -func testDoubleDeltaChunk(t *testing.T) { - ts := model.Time(14345645) - v := int64(123123) - - var input []model.SamplePair - for i := 0; i < 2000; i++ { - ts += model.Time(rand.Int63n(100) + 1) - v += rand.Int63n(1000) - if rand.Int() > 0 { - v *= -1 - } - - input = append(input, model.SamplePair{ - Timestamp: ts, - Value: model.SampleValue(v), - }) - } - - c := NewDoubleDeltaChunk(rand.Intn(3000)) - - app := c.Appender() - for i, s := range input { - err := app.Append(s.Timestamp, s.Value) - if err == ErrChunkFull { - input = input[:i] - break - } - require.NoError(t, err, "at sample %d: %v", i, s) - } - - result := []model.SamplePair{} - - it := c.Iterator() - for s, ok := it.First(); ok; s, ok = it.Next() { - result = append(result, s) - } - if it.Err() != io.EOF { - require.NoError(t, it.Err()) - } - require.Equal(t, input, result) -} - -func TestDoubleDeltaChunk(t *testing.T) { - for i := 0; i < 10000; i++ { - testDoubleDeltaChunk(t) - } -} diff --git a/chunks/xor.go b/chunks/xor.go index 5766aebc97..8eda98aaec 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -1,46 +1,82 @@ package chunks import ( + "encoding/binary" "math" bits "github.com/dgryski/go-bits" - "github.com/prometheus/common/model" ) // XORChunk holds XOR encoded sample data. type XORChunk struct { - bstream - + b *bstream num uint16 sz int - - lastLen int - lastCount uint8 } // NewXORChunk returns a new chunk with XOR encoding of the given size. -func NewXORChunk(sz int) *XORChunk { - return &XORChunk{sz: sz} +func NewXORChunk(size int) *XORChunk { + b := make([]byte, 3, 64) + b[0] = byte(EncXOR) + + return &XORChunk{ + b: &bstream{stream: b, count: 0}, + sz: size, + num: 0, + } } -func (c *XORChunk) Data() []byte { - return nil +// Bytes returns the underlying byte slice of the chunk. +func (c *XORChunk) Bytes() []byte { + b := c.b.bytes() + // Lazily populate length bytes – probably not necessary to have the + // cache value in struct. + binary.LittleEndian.PutUint16(b[1:3], c.num) + return b } // Appender implements the Chunk interface. -func (c *XORChunk) Appender() Appender { - return &xorAppender{c: c} +func (c *XORChunk) Appender() (Appender, error) { + it := c.iterator() + + // To get an appender we must know the state it would have if we had + // appended all existing data from scratch. + // We iterate through the end and populate via the iterator's state. + for it.Next() { + } + if err := it.Err(); err != nil { + return nil, err + } + + return &xorAppender{ + c: c, + b: c.b, + t: it.t, + v: it.val, + tDelta: it.tDelta, + leading: it.leading, + trailing: it.trailing, + }, nil +} + +func (c *XORChunk) iterator() *xorIterator { + // Should iterators guarantee to act on a copy of the data so it doesn't lock append? + // When using striped locks to guard access to chunks, probably yes. + // Could only copy data if the chunk is not completed yet. + return &xorIterator{ + br: newBReader(c.b.bytes()[3:]), + numTotal: c.num, + } } // Iterator implements the Chunk interface. func (c *XORChunk) Iterator() Iterator { - br := c.bstream.clone() - br.count = 8 - return &xorIterator{br: br, numTotal: c.num} + return fancyIterator{c.iterator()} } type xorAppender struct { c *XORChunk + b *bstream t int64 v float64 @@ -48,26 +84,20 @@ type xorAppender struct { leading uint8 trailing uint8 - finished bool } -func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error { - // TODO(fabxc): remove Prometheus types from interface. - return a.append(int64(ts), float64(v)) -} - -func (a *xorAppender) append(t int64, v float64) error { +func (a *xorAppender) Append(t int64, v float64) error { var tDelta uint64 if a.c.num == 0 { // TODO: store varint time? - a.c.writeBits(uint64(t), 64) - a.c.writeBits(math.Float64bits(v), 64) + a.b.writeBits(uint64(t), 64) + a.b.writeBits(math.Float64bits(v), 64) } else if a.c.num == 1 { tDelta = uint64(t - a.t) // TODO: use varint or other encoding for first delta? - a.c.writeBits(tDelta, 64) + a.b.writeBits(tDelta, 64) a.writeVDelta(v) } else { @@ -78,25 +108,25 @@ func (a *xorAppender) append(t int64, v float64) error { // Thus we use higher value range steps with larger bit size. switch { case dod == 0: - a.c.writeBit(zero) + a.b.writeBit(zero) case -8191 <= dod && dod <= 8192: - a.c.writeBits(0x02, 2) // '10' - a.c.writeBits(uint64(dod), 14) + a.b.writeBits(0x02, 2) // '10' + a.b.writeBits(uint64(dod), 14) case -65535 <= dod && dod <= 65536: - a.c.writeBits(0x06, 3) // '110' - a.c.writeBits(uint64(dod), 17) + a.b.writeBits(0x06, 3) // '110' + a.b.writeBits(uint64(dod), 17) case -524287 <= dod && dod <= 524288: - a.c.writeBits(0x0e, 4) // '1110' - a.c.writeBits(uint64(dod), 20) + a.b.writeBits(0x0e, 4) // '1110' + a.b.writeBits(uint64(dod), 20) default: - a.c.writeBits(0x0f, 4) // '1111' - a.c.writeBits(uint64(dod), 64) + a.b.writeBits(0x0f, 4) // '1111' + a.b.writeBits(uint64(dod), 64) } a.writeVDelta(v) } - if len(a.c.stream) > a.c.sz { + if len(a.b.bytes()) > a.c.sz { return ErrChunkFull } @@ -104,8 +134,7 @@ func (a *xorAppender) append(t int64, v float64) error { a.v = v a.c.num++ a.tDelta = tDelta - a.c.lastCount = a.c.count - a.c.lastLen = len(a.c.stream) + return nil } @@ -113,10 +142,10 @@ func (a *xorAppender) writeVDelta(v float64) { vDelta := math.Float64bits(v) ^ math.Float64bits(a.v) if vDelta == 0 { - a.c.writeBit(zero) + a.b.writeBit(zero) return } - a.c.writeBit(one) + a.b.writeBit(one) leading := uint8(bits.Clz(vDelta)) trailing := uint8(bits.Ctz(vDelta)) @@ -128,20 +157,20 @@ func (a *xorAppender) writeVDelta(v float64) { // TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead if a.leading != ^uint8(0) && leading >= a.leading && trailing >= a.trailing { - a.c.writeBit(zero) - a.c.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) + a.b.writeBit(zero) + a.b.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) } else { a.leading, a.trailing = leading, trailing - a.c.writeBit(one) - a.c.writeBits(uint64(leading), 5) + a.b.writeBit(one) + a.b.writeBits(uint64(leading), 5) // Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have. // Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0). // So instead we write out a 0 and adjust it back to 64 on unpacking. sigbits := 64 - leading - trailing - a.c.writeBits(uint64(sigbits), 6) - a.c.writeBits(vDelta>>trailing, int(sigbits)) + a.b.writeBits(uint64(sigbits), 6) + a.b.writeBits(vDelta>>trailing, int(sigbits)) } } @@ -156,7 +185,7 @@ type xorIterator struct { leading uint8 trailing uint8 - tDelta int64 + tDelta uint64 err error } @@ -164,16 +193,15 @@ func (it *xorIterator) Values() (int64, float64) { return it.t, it.val } -func (it *xorIterator) NextB() bool { +func (it *xorIterator) Err() error { + return it.err +} + +func (it *xorIterator) Next() bool { if it.err != nil || it.numRead == it.numTotal { return false } - var d byte - var dod int32 - var sz uint - var tDelta int64 - if it.numRead == 0 { t, err := it.br.readBits(64) if err != nil { @@ -197,12 +225,13 @@ func (it *xorIterator) NextB() bool { it.err = err return false } - it.tDelta = int64(tDelta) - it.t = it.t + it.tDelta + it.tDelta = tDelta + it.t = it.t + int64(it.tDelta) - goto ReadValue + return it.readValue() } + var d byte // read delta-of-delta for i := 0; i < 4; i++ { d <<= 1 @@ -216,7 +245,8 @@ func (it *xorIterator) NextB() bool { } d |= 1 } - + var sz uint8 + var dod int64 switch d { case 0x00: // dod == 0 @@ -233,7 +263,7 @@ func (it *xorIterator) NextB() bool { return false } - dod = int32(bits) + dod = int64(bits) } if sz != 0 { @@ -246,16 +276,16 @@ func (it *xorIterator) NextB() bool { // or something bits = bits - (1 << sz) } - dod = int32(bits) + dod = int64(bits) } - tDelta = it.tDelta + int64(dod) + it.tDelta = uint64(int64(it.tDelta) + dod) + it.t = it.t + int64(it.tDelta) - it.tDelta = tDelta - it.t = it.t + it.tDelta + return it.readValue() +} -ReadValue: - // read compressed value +func (it *xorIterator) readValue() bool { bit, err := it.br.readBit() if err != nil { it.err = err @@ -265,8 +295,8 @@ ReadValue: if bit == zero { // it.val = it.val } else { - bit, itErr := it.br.readBit() - if itErr != nil { + bit, err := it.br.readBit() + if err != nil { it.err = err return false } @@ -308,19 +338,3 @@ ReadValue: it.numRead++ return true } - -func (it *xorIterator) First() (model.SamplePair, bool) { - return model.SamplePair{}, false -} - -func (it *xorIterator) Seek(ts model.Time) (model.SamplePair, bool) { - return model.SamplePair{}, false -} - -func (it *xorIterator) Next() (model.SamplePair, bool) { - return model.SamplePair{}, false -} - -func (it *xorIterator) Err() error { - return it.err -} diff --git a/chunks/xor_test.go b/chunks/xor_test.go deleted file mode 100644 index fc95a34a19..0000000000 --- a/chunks/xor_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package chunks - -import ( - "math/rand" - "testing" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" -) - -func testXORChunk(t *testing.T) { - ts := model.Time(124213233) - v := int64(99954541) - - var input []model.SamplePair - for i := 0; i < 10000; i++ { - ts += model.Time(rand.Int63n(50000) + 1) - v += rand.Int63n(1000) - if rand.Int() > 0 { - v *= -1 - } - - input = append(input, model.SamplePair{ - Timestamp: ts, - Value: model.SampleValue(v), - }) - } - - c := NewXORChunk(rand.Intn(3000)) - - app := c.Appender() - for i, s := range input { - err := app.Append(s.Timestamp, s.Value) - if err == ErrChunkFull { - input = input[:i] - break - } - require.NoError(t, err, "at sample %d: %v", i, s) - } - - result := []model.SamplePair{} - - it := c.Iterator().(*xorIterator) - for { - ok := it.NextB() - if !ok { - break - } - t, v := it.Values() - result = append(result, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)}) - } - - require.NoError(t, it.Err()) - require.Equal(t, input, result) -} - -func TestXORChunk(t *testing.T) { - for i := 0; i < 10; i++ { - testXORChunk(t) - } -}