Use more varbit in histogram chunks

This adds bit buckets for larger numbers to varbit encoding and also
an unsigned version of varbit encoding.

Then, varbit encoding is used for all the histogram chunk data instead
of varint.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2021-10-13 20:03:35 +02:00
parent 7309c20e7e
commit 7093b089f2
4 changed files with 249 additions and 55 deletions

View file

@ -153,8 +153,6 @@ func (c *HistogramChunk) Appender() (Appender, error) {
sum: it.sum, sum: it.sum,
leading: it.leading, leading: it.leading,
trailing: it.trailing, trailing: it.trailing,
buf64: make([]byte, binary.MaxVarintLen64),
} }
if binary.BigEndian.Uint16(a.b.bytes()) == 0 { if binary.BigEndian.Uint16(a.b.bytes()) == 0 {
a.leading = 0xff a.leading = 0xff
@ -222,20 +220,6 @@ type HistogramAppender struct {
sum float64 sum float64
leading uint8 leading uint8
trailing uint8 trailing uint8
buf64 []byte // For working on varint64's.
}
func putVarint(b *bstream, buf []byte, x int64) {
for _, byt := range buf[:binary.PutVarint(buf, x)] {
b.writeByte(byt)
}
}
func putUvarint(b *bstream, buf []byte, x uint64) {
for _, byt := range buf[:binary.PutUvarint(buf, x)] {
b.writeByte(byt)
}
} }
// Append implements Appender. This implementation panics because normal float // Append implements Appender. This implementation panics because normal float
@ -418,18 +402,21 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) {
a.nBucketsDelta = make([]int64, numNBuckets) a.nBucketsDelta = make([]int64, numNBuckets)
// Now store the actual data. // Now store the actual data.
putVarint(a.b, a.buf64, t) putVarbitInt(a.b, t)
putUvarint(a.b, a.buf64, h.Count) // TODO(beorn7): Use putVarbitInt? putVarbitUint(a.b, h.Count)
putUvarint(a.b, a.buf64, h.ZeroCount) // TODO(beorn7): Use putVarbitInt? putVarbitUint(a.b, h.ZeroCount) //
a.b.writeBits(math.Float64bits(h.Sum), 64) a.b.writeBits(math.Float64bits(h.Sum), 64)
for _, buck := range h.PositiveBuckets { for _, b := range h.PositiveBuckets {
putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt? putVarbitInt(a.b, b)
} }
for _, buck := range h.NegativeBuckets { for _, b := range h.NegativeBuckets {
putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt? putVarbitInt(a.b, b)
} }
case 1: case 1:
tDelta = t - a.t tDelta = t - a.t
if tDelta < 0 {
panic("out of order timestamp")
}
cntDelta = int64(h.Count) - int64(a.cnt) cntDelta = int64(h.Count) - int64(a.cnt)
zCntDelta = int64(h.ZeroCount) - int64(a.zCnt) zCntDelta = int64(h.ZeroCount) - int64(a.zCnt)
@ -437,20 +424,20 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) {
cntDelta, zCntDelta = 0, 0 cntDelta, zCntDelta = 0, 0
} }
putVarint(a.b, a.buf64, tDelta) // TODO(beorn7): This should probably be putUvarint. putVarbitUint(a.b, uint64(tDelta))
putVarint(a.b, a.buf64, cntDelta) // TODO(beorn7): Use putVarbitInt? putVarbitInt(a.b, cntDelta)
putVarint(a.b, a.buf64, zCntDelta) // TODO(beorn7): Use putVarbitInt? putVarbitInt(a.b, zCntDelta)
a.writeSumDelta(h.Sum) a.writeSumDelta(h.Sum)
for i, buck := range h.PositiveBuckets { for i, b := range h.PositiveBuckets {
delta := buck - a.pBuckets[i] delta := b - a.pBuckets[i]
putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt? putVarbitInt(a.b, delta)
a.pBucketsDelta[i] = delta a.pBucketsDelta[i] = delta
} }
for i, buck := range h.NegativeBuckets { for i, b := range h.NegativeBuckets {
delta := buck - a.nBuckets[i] delta := b - a.nBuckets[i]
putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt? putVarbitInt(a.b, delta)
a.nBucketsDelta[i] = delta a.nBucketsDelta[i] = delta
} }
@ -721,21 +708,21 @@ func (it *histogramIterator) Next() bool {
} }
// Now read the actual data. // Now read the actual data.
t, err := binary.ReadVarint(&it.br) t, err := readVarbitInt(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
} }
it.t = t it.t = t
cnt, err := binary.ReadUvarint(&it.br) cnt, err := readVarbitUint(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
} }
it.cnt = cnt it.cnt = cnt
zcnt, err := binary.ReadUvarint(&it.br) zcnt, err := readVarbitUint(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
@ -750,7 +737,7 @@ func (it *histogramIterator) Next() bool {
it.sum = math.Float64frombits(sum) it.sum = math.Float64frombits(sum)
for i := range it.pBuckets { for i := range it.pBuckets {
v, err := binary.ReadVarint(&it.br) v, err := readVarbitInt(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
@ -758,7 +745,7 @@ func (it *histogramIterator) Next() bool {
it.pBuckets[i] = v it.pBuckets[i] = v
} }
for i := range it.nBuckets { for i := range it.nBuckets {
v, err := binary.ReadVarint(&it.br) v, err := readVarbitInt(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
@ -771,15 +758,15 @@ func (it *histogramIterator) Next() bool {
} }
if it.numRead == 1 { if it.numRead == 1 {
tDelta, err := binary.ReadVarint(&it.br) tDelta, err := readVarbitUint(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
} }
it.tDelta = tDelta it.tDelta = int64(tDelta)
it.t += int64(it.tDelta) it.t += it.tDelta
cntDelta, err := binary.ReadVarint(&it.br) cntDelta, err := readVarbitInt(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
@ -787,7 +774,7 @@ func (it *histogramIterator) Next() bool {
it.cntDelta = cntDelta it.cntDelta = cntDelta
it.cnt = uint64(int64(it.cnt) + it.cntDelta) it.cnt = uint64(int64(it.cnt) + it.cntDelta)
zcntDelta, err := binary.ReadVarint(&it.br) zcntDelta, err := readVarbitInt(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
@ -806,7 +793,7 @@ func (it *histogramIterator) Next() bool {
} }
for i := range it.pBuckets { for i := range it.pBuckets {
delta, err := binary.ReadVarint(&it.br) delta, err := readVarbitInt(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false
@ -816,7 +803,7 @@ func (it *histogramIterator) Next() bool {
} }
for i := range it.nBuckets { for i := range it.nBuckets {
delta, err := binary.ReadVarint(&it.br) delta, err := readVarbitInt(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false

View file

@ -27,7 +27,7 @@ func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64,
func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) { func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) {
putVarbitInt(b, int64(len(spans))) putVarbitInt(b, int64(len(spans)))
for _, s := range spans { for _, s := range spans {
putVarbitInt(b, int64(s.Length)) putVarbitUint(b, uint64(s.Length))
putVarbitInt(b, int64(s.Offset)) putVarbitInt(b, int64(s.Offset))
} }
} }
@ -69,7 +69,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
} }
for i := 0; i < int(num); i++ { for i := 0; i < int(num); i++ {
length, err := readVarbitInt(b) length, err := readVarbitUint(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -15,6 +15,9 @@ package chunkenc
import ( import (
"math" "math"
"math/bits"
"github.com/pkg/errors"
) )
// putVarbitFloat writes a float64 using varbit encoding. It does so by // putVarbitFloat writes a float64 using varbit encoding. It does so by
@ -53,7 +56,8 @@ func readVarbitFloat(b *bstreamReader) (float64, error) {
} }
// putVarbitInt writes an int64 using varbit encoding with a bit bucketing // putVarbitInt writes an int64 using varbit encoding with a bit bucketing
// optimized for the dod's observed in histogram buckets. // optimized for the dod's observed in histogram buckets, plus a few additional
// buckets for large numbers.
// //
// TODO(Dieterbe): We could improve this further: Each branch doesn't need to // TODO(Dieterbe): We could improve this further: Each branch doesn't need to
// support any values of any of the prior branches. So we can expand the range // support any values of any of the prior branches. So we can expand the range
@ -62,22 +66,31 @@ func readVarbitFloat(b *bstreamReader) (float64, error) {
// center-piece we skip). // center-piece we skip).
func putVarbitInt(b *bstream, val int64) { func putVarbitInt(b *bstream, val int64) {
switch { switch {
case val == 0: case val == 0: // Precisely 0, needs 1 bit.
b.writeBit(zero) b.writeBit(zero)
case bitRange(val, 3): // -3 <= val <= 4 case bitRange(val, 3): // -3 <= val <= 4, needs 5 bits.
b.writeBits(0b10, 2) b.writeBits(0b10, 2)
b.writeBits(uint64(val), 3) b.writeBits(uint64(val), 3)
case bitRange(val, 6): // -31 <= val <= 32 case bitRange(val, 6): // -31 <= val <= 32, 9 bits.
b.writeBits(0b110, 3) b.writeBits(0b110, 3)
b.writeBits(uint64(val), 6) b.writeBits(uint64(val), 6)
case bitRange(val, 9): // -255 <= val <= 256 case bitRange(val, 9): // -255 <= val <= 256, 13 bits.
b.writeBits(0b1110, 4) b.writeBits(0b1110, 4)
b.writeBits(uint64(val), 9) b.writeBits(uint64(val), 9)
case bitRange(val, 12): // -2047 <= val <= 2048 case bitRange(val, 12): // -2047 <= val <= 2048, 17 bits.
b.writeBits(0b11110, 5) b.writeBits(0b11110, 5)
b.writeBits(uint64(val), 12) b.writeBits(uint64(val), 12)
case bitRange(val, 18): // -131071 <= val <= 131072, 3 bytes.
b.writeBits(0b111110, 6)
b.writeBits(uint64(val), 18)
case bitRange(val, 25): // -16777215 <= val <= 16777216, 4 bytes.
b.writeBits(0b1111110, 7)
b.writeBits(uint64(val), 25)
case bitRange(val, 56): // -36028797018963967 <= val <= 36028797018963968, 8 bytes.
b.writeBits(0b11111110, 8)
b.writeBits(uint64(val), 56)
default: default:
b.writeBits(0b11111, 5) b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes.
b.writeBits(uint64(val), 64) b.writeBits(uint64(val), 64)
} }
} }
@ -85,7 +98,7 @@ func putVarbitInt(b *bstream, val int64) {
// readVarbitInt reads an int64 encoced with putVarbitInt. // readVarbitInt reads an int64 encoced with putVarbitInt.
func readVarbitInt(b *bstreamReader) (int64, error) { func readVarbitInt(b *bstreamReader) (int64, error) {
var d byte var d byte
for i := 0; i < 5; i++ { for i := 0; i < 8; i++ {
d <<= 1 d <<= 1
bit, err := b.readBitFast() bit, err := b.readBitFast()
if err != nil { if err != nil {
@ -114,7 +127,13 @@ func readVarbitInt(b *bstreamReader) (int64, error) {
sz = 9 sz = 9
case 0b11110: case 0b11110:
sz = 12 sz = 12
case 0b11111: case 0b111110:
sz = 18
case 0b1111110:
sz = 25
case 0b11111110:
sz = 56
case 0b11111111:
// Do not use fast because it's very unlikely it will succeed. // Do not use fast because it's very unlikely it will succeed.
bits, err := b.readBits(64) bits, err := b.readBits(64)
if err != nil { if err != nil {
@ -122,6 +141,8 @@ func readVarbitInt(b *bstreamReader) (int64, error) {
} }
val = int64(bits) val = int64(bits)
default:
return 0, errors.Errorf("invalid bit pattern %b", d)
} }
if sz != 0 { if sz != 0 {
@ -141,3 +162,104 @@ func readVarbitInt(b *bstreamReader) (int64, error) {
return val, nil return val, nil
} }
func bitRangeUint(x uint64, nbits int) bool {
return bits.LeadingZeros64(x) >= 64-nbits
}
// putVarbitUint writes a uint64 using varbit encoding. It uses the same bit
// buckets as putVarbitInt.
func putVarbitUint(b *bstream, val uint64) {
switch {
case val == 0: // Precisely 0, needs 1 bit.
b.writeBit(zero)
case bitRangeUint(val, 3): // val <= 7, needs 5 bits.
b.writeBits(0b10, 2)
b.writeBits(val, 3)
case bitRangeUint(val, 6): // val <= 63, 9 bits.
b.writeBits(0b110, 3)
b.writeBits(val, 6)
case bitRangeUint(val, 9): // val <= 511, 13 bits.
b.writeBits(0b1110, 4)
b.writeBits(val, 9)
case bitRangeUint(val, 12): // val <= 4095, 17 bits.
b.writeBits(0b11110, 5)
b.writeBits(val, 12)
case bitRangeUint(val, 18): // val <= 262143, 3 bytes.
b.writeBits(0b111110, 6)
b.writeBits(val, 18)
case bitRangeUint(val, 25): // val <= 33554431, 4 bytes.
b.writeBits(0b1111110, 7)
b.writeBits(val, 25)
case bitRangeUint(val, 56): // val <= 72057594037927935, 8 bytes.
b.writeBits(0b11111110, 8)
b.writeBits(val, 56)
default:
b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes.
b.writeBits(val, 64)
}
}
// readVarbitUint reads a uint64 encoced with putVarbitUint.
func readVarbitUint(b *bstreamReader) (uint64, error) {
var d byte
for i := 0; i < 8; i++ {
d <<= 1
bit, err := b.readBitFast()
if err != nil {
bit, err = b.readBit()
}
if err != nil {
return 0, err
}
if bit == zero {
break
}
d |= 1
}
var (
bits uint64
sz uint8
err error
)
switch d {
case 0b0:
// val == 0
case 0b10:
sz = 3
case 0b110:
sz = 6
case 0b1110:
sz = 9
case 0b11110:
sz = 12
case 0b111110:
sz = 18
case 0b1111110:
sz = 25
case 0b11111110:
sz = 56
case 0b11111111:
// Do not use fast because it's very unlikely it will succeed.
bits, err = b.readBits(64)
if err != nil {
return 0, err
}
default:
return 0, errors.Errorf("invalid bit pattern %b", d)
}
if sz != 0 {
bits, err = b.readBitsFast(sz)
if err != nil {
bits, err = b.readBits(sz)
}
if err != nil {
return 0, err
}
}
return bits, nil
}

View file

@ -0,0 +1,85 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"math"
"testing"
"github.com/stretchr/testify/require"
)
func TestVarbitInt(t *testing.T) {
numbers := []int64{
math.MinInt64,
-36028797018963968, -36028797018963967,
-16777216, -16777215,
-131072, -131071,
-2048, -2047,
-256, -255,
-32, -31,
-4, -3,
-1, 0, 1,
4, 5,
32, 33,
256, 257,
2048, 2049,
131072, 131073,
16777216, 16777217,
36028797018963968, 36028797018963969,
math.MaxInt64,
}
bs := bstream{}
for _, n := range numbers {
putVarbitInt(&bs, n)
}
bsr := newBReader(bs.bytes())
for _, want := range numbers {
got, err := readVarbitInt(&bsr)
require.NoError(t, err)
require.Equal(t, want, got)
}
}
func TestVarbitUint(t *testing.T) {
numbers := []uint64{
0, 1,
7, 8,
63, 64,
511, 512,
4095, 4096,
262143, 262144,
33554431, 33554432,
72057594037927935, 72057594037927936,
math.MaxUint64,
}
bs := bstream{}
for _, n := range numbers {
putVarbitUint(&bs, n)
}
bsr := newBReader(bs.bytes())
for _, want := range numbers {
got, err := readVarbitUint(&bsr)
require.NoError(t, err)
require.Equal(t, want, got)
}
}