diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index d1cd36469..e0aa8f83b 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -153,8 +153,6 @@ func (c *HistogramChunk) Appender() (Appender, error) { sum: it.sum, leading: it.leading, trailing: it.trailing, - - buf64: make([]byte, binary.MaxVarintLen64), } if binary.BigEndian.Uint16(a.b.bytes()) == 0 { a.leading = 0xff @@ -222,20 +220,6 @@ type HistogramAppender struct { sum float64 leading uint8 trailing uint8 - - buf64 []byte // For working on varint64's. -} - -func putVarint(b *bstream, buf []byte, x int64) { - for _, byt := range buf[:binary.PutVarint(buf, x)] { - b.writeByte(byt) - } -} - -func putUvarint(b *bstream, buf []byte, x uint64) { - for _, byt := range buf[:binary.PutUvarint(buf, x)] { - b.writeByte(byt) - } } // Append implements Appender. This implementation panics because normal float @@ -418,18 +402,21 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { a.nBucketsDelta = make([]int64, numNBuckets) // Now store the actual data. - putVarint(a.b, a.buf64, t) - putUvarint(a.b, a.buf64, h.Count) // TODO(beorn7): Use putVarbitInt? - putUvarint(a.b, a.buf64, h.ZeroCount) // TODO(beorn7): Use putVarbitInt? + putVarbitInt(a.b, t) + putVarbitUint(a.b, h.Count) + putVarbitUint(a.b, h.ZeroCount) // a.b.writeBits(math.Float64bits(h.Sum), 64) - for _, buck := range h.PositiveBuckets { - putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt? + for _, b := range h.PositiveBuckets { + putVarbitInt(a.b, b) } - for _, buck := range h.NegativeBuckets { - putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt? + for _, b := range h.NegativeBuckets { + putVarbitInt(a.b, b) } case 1: tDelta = t - a.t + if tDelta < 0 { + panic("out of order timestamp") + } cntDelta = int64(h.Count) - int64(a.cnt) zCntDelta = int64(h.ZeroCount) - int64(a.zCnt) @@ -437,20 +424,20 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { cntDelta, zCntDelta = 0, 0 } - putVarint(a.b, a.buf64, tDelta) // TODO(beorn7): This should probably be putUvarint. - putVarint(a.b, a.buf64, cntDelta) // TODO(beorn7): Use putVarbitInt? - putVarint(a.b, a.buf64, zCntDelta) // TODO(beorn7): Use putVarbitInt? + putVarbitUint(a.b, uint64(tDelta)) + putVarbitInt(a.b, cntDelta) + putVarbitInt(a.b, zCntDelta) a.writeSumDelta(h.Sum) - for i, buck := range h.PositiveBuckets { - delta := buck - a.pBuckets[i] - putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt? + for i, b := range h.PositiveBuckets { + delta := b - a.pBuckets[i] + putVarbitInt(a.b, delta) a.pBucketsDelta[i] = delta } - for i, buck := range h.NegativeBuckets { - delta := buck - a.nBuckets[i] - putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt? + for i, b := range h.NegativeBuckets { + delta := b - a.nBuckets[i] + putVarbitInt(a.b, delta) a.nBucketsDelta[i] = delta } @@ -721,21 +708,21 @@ func (it *histogramIterator) Next() bool { } // Now read the actual data. - t, err := binary.ReadVarint(&it.br) + t, err := readVarbitInt(&it.br) if err != nil { it.err = err return false } it.t = t - cnt, err := binary.ReadUvarint(&it.br) + cnt, err := readVarbitUint(&it.br) if err != nil { it.err = err return false } it.cnt = cnt - zcnt, err := binary.ReadUvarint(&it.br) + zcnt, err := readVarbitUint(&it.br) if err != nil { it.err = err return false @@ -750,7 +737,7 @@ func (it *histogramIterator) Next() bool { it.sum = math.Float64frombits(sum) for i := range it.pBuckets { - v, err := binary.ReadVarint(&it.br) + v, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -758,7 +745,7 @@ func (it *histogramIterator) Next() bool { it.pBuckets[i] = v } for i := range it.nBuckets { - v, err := binary.ReadVarint(&it.br) + v, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -771,15 +758,15 @@ func (it *histogramIterator) Next() bool { } if it.numRead == 1 { - tDelta, err := binary.ReadVarint(&it.br) + tDelta, err := readVarbitUint(&it.br) if err != nil { it.err = err return false } - it.tDelta = tDelta - it.t += int64(it.tDelta) + it.tDelta = int64(tDelta) + it.t += it.tDelta - cntDelta, err := binary.ReadVarint(&it.br) + cntDelta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -787,7 +774,7 @@ func (it *histogramIterator) Next() bool { it.cntDelta = cntDelta it.cnt = uint64(int64(it.cnt) + it.cntDelta) - zcntDelta, err := binary.ReadVarint(&it.br) + zcntDelta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -806,7 +793,7 @@ func (it *histogramIterator) Next() bool { } for i := range it.pBuckets { - delta, err := binary.ReadVarint(&it.br) + delta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -816,7 +803,7 @@ func (it *histogramIterator) Next() bool { } for i := range it.nBuckets { - delta, err := binary.ReadVarint(&it.br) + delta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index cc692006a..dd1d876d3 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -27,7 +27,7 @@ func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64, func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) { putVarbitInt(b, int64(len(spans))) for _, s := range spans { - putVarbitInt(b, int64(s.Length)) + putVarbitUint(b, uint64(s.Length)) putVarbitInt(b, int64(s.Offset)) } } @@ -69,7 +69,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { } for i := 0; i < int(num); i++ { - length, err := readVarbitInt(b) + length, err := readVarbitUint(b) if err != nil { return nil, err } diff --git a/tsdb/chunkenc/varbit.go b/tsdb/chunkenc/varbit.go index 3465c1af1..c17600e4a 100644 --- a/tsdb/chunkenc/varbit.go +++ b/tsdb/chunkenc/varbit.go @@ -15,6 +15,9 @@ package chunkenc import ( "math" + "math/bits" + + "github.com/pkg/errors" ) // putVarbitFloat writes a float64 using varbit encoding. It does so by @@ -53,7 +56,8 @@ func readVarbitFloat(b *bstreamReader) (float64, error) { } // putVarbitInt writes an int64 using varbit encoding with a bit bucketing -// optimized for the dod's observed in histogram buckets. +// optimized for the dod's observed in histogram buckets, plus a few additional +// buckets for large numbers. // // TODO(Dieterbe): We could improve this further: Each branch doesn't need to // support any values of any of the prior branches. So we can expand the range @@ -62,22 +66,31 @@ func readVarbitFloat(b *bstreamReader) (float64, error) { // center-piece we skip). func putVarbitInt(b *bstream, val int64) { switch { - case val == 0: + case val == 0: // Precisely 0, needs 1 bit. b.writeBit(zero) - case bitRange(val, 3): // -3 <= val <= 4 + case bitRange(val, 3): // -3 <= val <= 4, needs 5 bits. b.writeBits(0b10, 2) b.writeBits(uint64(val), 3) - case bitRange(val, 6): // -31 <= val <= 32 + case bitRange(val, 6): // -31 <= val <= 32, 9 bits. b.writeBits(0b110, 3) b.writeBits(uint64(val), 6) - case bitRange(val, 9): // -255 <= val <= 256 + case bitRange(val, 9): // -255 <= val <= 256, 13 bits. b.writeBits(0b1110, 4) b.writeBits(uint64(val), 9) - case bitRange(val, 12): // -2047 <= val <= 2048 + case bitRange(val, 12): // -2047 <= val <= 2048, 17 bits. b.writeBits(0b11110, 5) b.writeBits(uint64(val), 12) + case bitRange(val, 18): // -131071 <= val <= 131072, 3 bytes. + b.writeBits(0b111110, 6) + b.writeBits(uint64(val), 18) + case bitRange(val, 25): // -16777215 <= val <= 16777216, 4 bytes. + b.writeBits(0b1111110, 7) + b.writeBits(uint64(val), 25) + case bitRange(val, 56): // -36028797018963967 <= val <= 36028797018963968, 8 bytes. + b.writeBits(0b11111110, 8) + b.writeBits(uint64(val), 56) default: - b.writeBits(0b11111, 5) + b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes. b.writeBits(uint64(val), 64) } } @@ -85,7 +98,7 @@ func putVarbitInt(b *bstream, val int64) { // readVarbitInt reads an int64 encoced with putVarbitInt. func readVarbitInt(b *bstreamReader) (int64, error) { var d byte - for i := 0; i < 5; i++ { + for i := 0; i < 8; i++ { d <<= 1 bit, err := b.readBitFast() if err != nil { @@ -114,7 +127,13 @@ func readVarbitInt(b *bstreamReader) (int64, error) { sz = 9 case 0b11110: sz = 12 - case 0b11111: + case 0b111110: + sz = 18 + case 0b1111110: + sz = 25 + case 0b11111110: + sz = 56 + case 0b11111111: // Do not use fast because it's very unlikely it will succeed. bits, err := b.readBits(64) if err != nil { @@ -122,6 +141,8 @@ func readVarbitInt(b *bstreamReader) (int64, error) { } val = int64(bits) + default: + return 0, errors.Errorf("invalid bit pattern %b", d) } if sz != 0 { @@ -141,3 +162,104 @@ func readVarbitInt(b *bstreamReader) (int64, error) { return val, nil } + +func bitRangeUint(x uint64, nbits int) bool { + return bits.LeadingZeros64(x) >= 64-nbits +} + +// putVarbitUint writes a uint64 using varbit encoding. It uses the same bit +// buckets as putVarbitInt. +func putVarbitUint(b *bstream, val uint64) { + switch { + case val == 0: // Precisely 0, needs 1 bit. + b.writeBit(zero) + case bitRangeUint(val, 3): // val <= 7, needs 5 bits. + b.writeBits(0b10, 2) + b.writeBits(val, 3) + case bitRangeUint(val, 6): // val <= 63, 9 bits. + b.writeBits(0b110, 3) + b.writeBits(val, 6) + case bitRangeUint(val, 9): // val <= 511, 13 bits. + b.writeBits(0b1110, 4) + b.writeBits(val, 9) + case bitRangeUint(val, 12): // val <= 4095, 17 bits. + b.writeBits(0b11110, 5) + b.writeBits(val, 12) + case bitRangeUint(val, 18): // val <= 262143, 3 bytes. + b.writeBits(0b111110, 6) + b.writeBits(val, 18) + case bitRangeUint(val, 25): // val <= 33554431, 4 bytes. + b.writeBits(0b1111110, 7) + b.writeBits(val, 25) + case bitRangeUint(val, 56): // val <= 72057594037927935, 8 bytes. + b.writeBits(0b11111110, 8) + b.writeBits(val, 56) + default: + b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes. + b.writeBits(val, 64) + } +} + +// readVarbitUint reads a uint64 encoced with putVarbitUint. +func readVarbitUint(b *bstreamReader) (uint64, error) { + var d byte + for i := 0; i < 8; i++ { + d <<= 1 + bit, err := b.readBitFast() + if err != nil { + bit, err = b.readBit() + } + if err != nil { + return 0, err + } + if bit == zero { + break + } + d |= 1 + } + + var ( + bits uint64 + sz uint8 + err error + ) + + switch d { + case 0b0: + // val == 0 + case 0b10: + sz = 3 + case 0b110: + sz = 6 + case 0b1110: + sz = 9 + case 0b11110: + sz = 12 + case 0b111110: + sz = 18 + case 0b1111110: + sz = 25 + case 0b11111110: + sz = 56 + case 0b11111111: + // Do not use fast because it's very unlikely it will succeed. + bits, err = b.readBits(64) + if err != nil { + return 0, err + } + default: + return 0, errors.Errorf("invalid bit pattern %b", d) + } + + if sz != 0 { + bits, err = b.readBitsFast(sz) + if err != nil { + bits, err = b.readBits(sz) + } + if err != nil { + return 0, err + } + } + + return bits, nil +} diff --git a/tsdb/chunkenc/varbit_test.go b/tsdb/chunkenc/varbit_test.go new file mode 100644 index 000000000..8042b98dc --- /dev/null +++ b/tsdb/chunkenc/varbit_test.go @@ -0,0 +1,85 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVarbitInt(t *testing.T) { + numbers := []int64{ + math.MinInt64, + -36028797018963968, -36028797018963967, + -16777216, -16777215, + -131072, -131071, + -2048, -2047, + -256, -255, + -32, -31, + -4, -3, + -1, 0, 1, + 4, 5, + 32, 33, + 256, 257, + 2048, 2049, + 131072, 131073, + 16777216, 16777217, + 36028797018963968, 36028797018963969, + math.MaxInt64, + } + + bs := bstream{} + + for _, n := range numbers { + putVarbitInt(&bs, n) + } + + bsr := newBReader(bs.bytes()) + + for _, want := range numbers { + got, err := readVarbitInt(&bsr) + require.NoError(t, err) + require.Equal(t, want, got) + } +} + +func TestVarbitUint(t *testing.T) { + numbers := []uint64{ + 0, 1, + 7, 8, + 63, 64, + 511, 512, + 4095, 4096, + 262143, 262144, + 33554431, 33554432, + 72057594037927935, 72057594037927936, + math.MaxUint64, + } + + bs := bstream{} + + for _, n := range numbers { + putVarbitUint(&bs, n) + } + + bsr := newBReader(bs.bytes()) + + for _, want := range numbers { + got, err := readVarbitUint(&bsr) + require.NoError(t, err) + require.Equal(t, want, got) + } +}