From 7093b089f2d3eef4b190827f49e1ba7c62ce13a9 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 13 Oct 2021 20:03:35 +0200 Subject: [PATCH] Use more varbit in histogram chunks This adds bit buckets for larger numbers to varbit encoding and also an unsigned version of varbit encoding. Then, varbit encoding is used for all the histogram chunk data instead of varint. Signed-off-by: beorn7 --- tsdb/chunkenc/histogram.go | 75 +++++++---------- tsdb/chunkenc/histogram_meta.go | 4 +- tsdb/chunkenc/varbit.go | 140 ++++++++++++++++++++++++++++++-- tsdb/chunkenc/varbit_test.go | 85 +++++++++++++++++++ 4 files changed, 249 insertions(+), 55 deletions(-) create mode 100644 tsdb/chunkenc/varbit_test.go diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index d1cd36469..e0aa8f83b 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -153,8 +153,6 @@ func (c *HistogramChunk) Appender() (Appender, error) { sum: it.sum, leading: it.leading, trailing: it.trailing, - - buf64: make([]byte, binary.MaxVarintLen64), } if binary.BigEndian.Uint16(a.b.bytes()) == 0 { a.leading = 0xff @@ -222,20 +220,6 @@ type HistogramAppender struct { sum float64 leading uint8 trailing uint8 - - buf64 []byte // For working on varint64's. -} - -func putVarint(b *bstream, buf []byte, x int64) { - for _, byt := range buf[:binary.PutVarint(buf, x)] { - b.writeByte(byt) - } -} - -func putUvarint(b *bstream, buf []byte, x uint64) { - for _, byt := range buf[:binary.PutUvarint(buf, x)] { - b.writeByte(byt) - } } // Append implements Appender. This implementation panics because normal float @@ -418,18 +402,21 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { a.nBucketsDelta = make([]int64, numNBuckets) // Now store the actual data. - putVarint(a.b, a.buf64, t) - putUvarint(a.b, a.buf64, h.Count) // TODO(beorn7): Use putVarbitInt? - putUvarint(a.b, a.buf64, h.ZeroCount) // TODO(beorn7): Use putVarbitInt? + putVarbitInt(a.b, t) + putVarbitUint(a.b, h.Count) + putVarbitUint(a.b, h.ZeroCount) // a.b.writeBits(math.Float64bits(h.Sum), 64) - for _, buck := range h.PositiveBuckets { - putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt? + for _, b := range h.PositiveBuckets { + putVarbitInt(a.b, b) } - for _, buck := range h.NegativeBuckets { - putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt? + for _, b := range h.NegativeBuckets { + putVarbitInt(a.b, b) } case 1: tDelta = t - a.t + if tDelta < 0 { + panic("out of order timestamp") + } cntDelta = int64(h.Count) - int64(a.cnt) zCntDelta = int64(h.ZeroCount) - int64(a.zCnt) @@ -437,20 +424,20 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) { cntDelta, zCntDelta = 0, 0 } - putVarint(a.b, a.buf64, tDelta) // TODO(beorn7): This should probably be putUvarint. - putVarint(a.b, a.buf64, cntDelta) // TODO(beorn7): Use putVarbitInt? - putVarint(a.b, a.buf64, zCntDelta) // TODO(beorn7): Use putVarbitInt? + putVarbitUint(a.b, uint64(tDelta)) + putVarbitInt(a.b, cntDelta) + putVarbitInt(a.b, zCntDelta) a.writeSumDelta(h.Sum) - for i, buck := range h.PositiveBuckets { - delta := buck - a.pBuckets[i] - putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt? + for i, b := range h.PositiveBuckets { + delta := b - a.pBuckets[i] + putVarbitInt(a.b, delta) a.pBucketsDelta[i] = delta } - for i, buck := range h.NegativeBuckets { - delta := buck - a.nBuckets[i] - putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt? + for i, b := range h.NegativeBuckets { + delta := b - a.nBuckets[i] + putVarbitInt(a.b, delta) a.nBucketsDelta[i] = delta } @@ -721,21 +708,21 @@ func (it *histogramIterator) Next() bool { } // Now read the actual data. - t, err := binary.ReadVarint(&it.br) + t, err := readVarbitInt(&it.br) if err != nil { it.err = err return false } it.t = t - cnt, err := binary.ReadUvarint(&it.br) + cnt, err := readVarbitUint(&it.br) if err != nil { it.err = err return false } it.cnt = cnt - zcnt, err := binary.ReadUvarint(&it.br) + zcnt, err := readVarbitUint(&it.br) if err != nil { it.err = err return false @@ -750,7 +737,7 @@ func (it *histogramIterator) Next() bool { it.sum = math.Float64frombits(sum) for i := range it.pBuckets { - v, err := binary.ReadVarint(&it.br) + v, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -758,7 +745,7 @@ func (it *histogramIterator) Next() bool { it.pBuckets[i] = v } for i := range it.nBuckets { - v, err := binary.ReadVarint(&it.br) + v, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -771,15 +758,15 @@ func (it *histogramIterator) Next() bool { } if it.numRead == 1 { - tDelta, err := binary.ReadVarint(&it.br) + tDelta, err := readVarbitUint(&it.br) if err != nil { it.err = err return false } - it.tDelta = tDelta - it.t += int64(it.tDelta) + it.tDelta = int64(tDelta) + it.t += it.tDelta - cntDelta, err := binary.ReadVarint(&it.br) + cntDelta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -787,7 +774,7 @@ func (it *histogramIterator) Next() bool { it.cntDelta = cntDelta it.cnt = uint64(int64(it.cnt) + it.cntDelta) - zcntDelta, err := binary.ReadVarint(&it.br) + zcntDelta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -806,7 +793,7 @@ func (it *histogramIterator) Next() bool { } for i := range it.pBuckets { - delta, err := binary.ReadVarint(&it.br) + delta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false @@ -816,7 +803,7 @@ func (it *histogramIterator) Next() bool { } for i := range it.nBuckets { - delta, err := binary.ReadVarint(&it.br) + delta, err := readVarbitInt(&it.br) if err != nil { it.err = err return false diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index cc692006a..dd1d876d3 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -27,7 +27,7 @@ func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64, func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) { putVarbitInt(b, int64(len(spans))) for _, s := range spans { - putVarbitInt(b, int64(s.Length)) + putVarbitUint(b, uint64(s.Length)) putVarbitInt(b, int64(s.Offset)) } } @@ -69,7 +69,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { } for i := 0; i < int(num); i++ { - length, err := readVarbitInt(b) + length, err := readVarbitUint(b) if err != nil { return nil, err } diff --git a/tsdb/chunkenc/varbit.go b/tsdb/chunkenc/varbit.go index 3465c1af1..c17600e4a 100644 --- a/tsdb/chunkenc/varbit.go +++ b/tsdb/chunkenc/varbit.go @@ -15,6 +15,9 @@ package chunkenc import ( "math" + "math/bits" + + "github.com/pkg/errors" ) // putVarbitFloat writes a float64 using varbit encoding. It does so by @@ -53,7 +56,8 @@ func readVarbitFloat(b *bstreamReader) (float64, error) { } // putVarbitInt writes an int64 using varbit encoding with a bit bucketing -// optimized for the dod's observed in histogram buckets. +// optimized for the dod's observed in histogram buckets, plus a few additional +// buckets for large numbers. // // TODO(Dieterbe): We could improve this further: Each branch doesn't need to // support any values of any of the prior branches. So we can expand the range @@ -62,22 +66,31 @@ func readVarbitFloat(b *bstreamReader) (float64, error) { // center-piece we skip). func putVarbitInt(b *bstream, val int64) { switch { - case val == 0: + case val == 0: // Precisely 0, needs 1 bit. b.writeBit(zero) - case bitRange(val, 3): // -3 <= val <= 4 + case bitRange(val, 3): // -3 <= val <= 4, needs 5 bits. b.writeBits(0b10, 2) b.writeBits(uint64(val), 3) - case bitRange(val, 6): // -31 <= val <= 32 + case bitRange(val, 6): // -31 <= val <= 32, 9 bits. b.writeBits(0b110, 3) b.writeBits(uint64(val), 6) - case bitRange(val, 9): // -255 <= val <= 256 + case bitRange(val, 9): // -255 <= val <= 256, 13 bits. b.writeBits(0b1110, 4) b.writeBits(uint64(val), 9) - case bitRange(val, 12): // -2047 <= val <= 2048 + case bitRange(val, 12): // -2047 <= val <= 2048, 17 bits. b.writeBits(0b11110, 5) b.writeBits(uint64(val), 12) + case bitRange(val, 18): // -131071 <= val <= 131072, 3 bytes. + b.writeBits(0b111110, 6) + b.writeBits(uint64(val), 18) + case bitRange(val, 25): // -16777215 <= val <= 16777216, 4 bytes. + b.writeBits(0b1111110, 7) + b.writeBits(uint64(val), 25) + case bitRange(val, 56): // -36028797018963967 <= val <= 36028797018963968, 8 bytes. + b.writeBits(0b11111110, 8) + b.writeBits(uint64(val), 56) default: - b.writeBits(0b11111, 5) + b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes. b.writeBits(uint64(val), 64) } } @@ -85,7 +98,7 @@ func putVarbitInt(b *bstream, val int64) { // readVarbitInt reads an int64 encoced with putVarbitInt. func readVarbitInt(b *bstreamReader) (int64, error) { var d byte - for i := 0; i < 5; i++ { + for i := 0; i < 8; i++ { d <<= 1 bit, err := b.readBitFast() if err != nil { @@ -114,7 +127,13 @@ func readVarbitInt(b *bstreamReader) (int64, error) { sz = 9 case 0b11110: sz = 12 - case 0b11111: + case 0b111110: + sz = 18 + case 0b1111110: + sz = 25 + case 0b11111110: + sz = 56 + case 0b11111111: // Do not use fast because it's very unlikely it will succeed. bits, err := b.readBits(64) if err != nil { @@ -122,6 +141,8 @@ func readVarbitInt(b *bstreamReader) (int64, error) { } val = int64(bits) + default: + return 0, errors.Errorf("invalid bit pattern %b", d) } if sz != 0 { @@ -141,3 +162,104 @@ func readVarbitInt(b *bstreamReader) (int64, error) { return val, nil } + +func bitRangeUint(x uint64, nbits int) bool { + return bits.LeadingZeros64(x) >= 64-nbits +} + +// putVarbitUint writes a uint64 using varbit encoding. It uses the same bit +// buckets as putVarbitInt. +func putVarbitUint(b *bstream, val uint64) { + switch { + case val == 0: // Precisely 0, needs 1 bit. + b.writeBit(zero) + case bitRangeUint(val, 3): // val <= 7, needs 5 bits. + b.writeBits(0b10, 2) + b.writeBits(val, 3) + case bitRangeUint(val, 6): // val <= 63, 9 bits. + b.writeBits(0b110, 3) + b.writeBits(val, 6) + case bitRangeUint(val, 9): // val <= 511, 13 bits. + b.writeBits(0b1110, 4) + b.writeBits(val, 9) + case bitRangeUint(val, 12): // val <= 4095, 17 bits. + b.writeBits(0b11110, 5) + b.writeBits(val, 12) + case bitRangeUint(val, 18): // val <= 262143, 3 bytes. + b.writeBits(0b111110, 6) + b.writeBits(val, 18) + case bitRangeUint(val, 25): // val <= 33554431, 4 bytes. + b.writeBits(0b1111110, 7) + b.writeBits(val, 25) + case bitRangeUint(val, 56): // val <= 72057594037927935, 8 bytes. + b.writeBits(0b11111110, 8) + b.writeBits(val, 56) + default: + b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes. + b.writeBits(val, 64) + } +} + +// readVarbitUint reads a uint64 encoced with putVarbitUint. +func readVarbitUint(b *bstreamReader) (uint64, error) { + var d byte + for i := 0; i < 8; i++ { + d <<= 1 + bit, err := b.readBitFast() + if err != nil { + bit, err = b.readBit() + } + if err != nil { + return 0, err + } + if bit == zero { + break + } + d |= 1 + } + + var ( + bits uint64 + sz uint8 + err error + ) + + switch d { + case 0b0: + // val == 0 + case 0b10: + sz = 3 + case 0b110: + sz = 6 + case 0b1110: + sz = 9 + case 0b11110: + sz = 12 + case 0b111110: + sz = 18 + case 0b1111110: + sz = 25 + case 0b11111110: + sz = 56 + case 0b11111111: + // Do not use fast because it's very unlikely it will succeed. + bits, err = b.readBits(64) + if err != nil { + return 0, err + } + default: + return 0, errors.Errorf("invalid bit pattern %b", d) + } + + if sz != 0 { + bits, err = b.readBitsFast(sz) + if err != nil { + bits, err = b.readBits(sz) + } + if err != nil { + return 0, err + } + } + + return bits, nil +} diff --git a/tsdb/chunkenc/varbit_test.go b/tsdb/chunkenc/varbit_test.go new file mode 100644 index 000000000..8042b98dc --- /dev/null +++ b/tsdb/chunkenc/varbit_test.go @@ -0,0 +1,85 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunkenc + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVarbitInt(t *testing.T) { + numbers := []int64{ + math.MinInt64, + -36028797018963968, -36028797018963967, + -16777216, -16777215, + -131072, -131071, + -2048, -2047, + -256, -255, + -32, -31, + -4, -3, + -1, 0, 1, + 4, 5, + 32, 33, + 256, 257, + 2048, 2049, + 131072, 131073, + 16777216, 16777217, + 36028797018963968, 36028797018963969, + math.MaxInt64, + } + + bs := bstream{} + + for _, n := range numbers { + putVarbitInt(&bs, n) + } + + bsr := newBReader(bs.bytes()) + + for _, want := range numbers { + got, err := readVarbitInt(&bsr) + require.NoError(t, err) + require.Equal(t, want, got) + } +} + +func TestVarbitUint(t *testing.T) { + numbers := []uint64{ + 0, 1, + 7, 8, + 63, 64, + 511, 512, + 4095, 4096, + 262143, 262144, + 33554431, 33554432, + 72057594037927935, 72057594037927936, + math.MaxUint64, + } + + bs := bstream{} + + for _, n := range numbers { + putVarbitUint(&bs, n) + } + + bsr := newBReader(bs.bytes()) + + for _, want := range numbers { + got, err := readVarbitUint(&bsr) + require.NoError(t, err) + require.Equal(t, want, got) + } +}