Sparsehistogram: improve dod encoding, testing, encode chunk metadata (#9015)

* factor out different varbit schemes and include Beorn's "optimum" for buckets

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* use more compact dod encoding scheme for SHS chunk columns

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* remove FB VB and xor dod encoding because we won't use it

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* HistoChunk metadata encoding

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* add SparseHistogram.Copy()

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* histogram test: test appending a few histograms

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* add license headers

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>
This commit is contained in:
Dieter Plaetinck 2021-06-30 13:45:43 +03:00 committed by GitHub
parent 04ad56d9b8
commit 4d27816ea5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 351 additions and 147 deletions

View file

@ -25,3 +25,26 @@ type Span struct {
Offset int32
Length uint32
}
func (s SparseHistogram) Copy() SparseHistogram {
c := s
if s.PositiveSpans != nil {
c.PositiveSpans = make([]Span, len(s.PositiveSpans))
copy(c.PositiveSpans, s.PositiveSpans)
}
if s.NegativeSpans != nil {
c.NegativeSpans = make([]Span, len(s.NegativeSpans))
copy(c.NegativeSpans, s.NegativeSpans)
}
if s.PositiveBuckets != nil {
c.PositiveBuckets = make([]int64, len(s.PositiveBuckets))
copy(c.PositiveBuckets, s.PositiveBuckets)
}
if s.NegativeBuckets != nil {
c.NegativeBuckets = make([]int64, len(s.NegativeBuckets))
copy(c.NegativeBuckets, s.NegativeBuckets)
}
return c
}

View file

@ -75,10 +75,6 @@ const ()
type HistoChunk struct {
b bstream
// "metadata" describing all the data within this chunk
schema int32
posSpans, negSpans []histogram.Span
}
// NewHistoChunk returns a new chunk with Histo encoding of the given size.
@ -102,6 +98,16 @@ func (c *HistoChunk) NumSamples() int {
return int(binary.BigEndian.Uint16(c.Bytes()))
}
// Meta returns the histogram metadata.
// callers may only call this on chunks that have at least one sample
func (c *HistoChunk) Meta() (int32, []histogram.Span, []histogram.Span, error) {
if c.NumSamples() == 0 {
panic("HistoChunk.Meta() called on an empty chunk")
}
b := newBReader(c.Bytes()[2:])
return readHistoChunkMeta(&b)
}
func (c *HistoChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
buf := make([]byte, l)
@ -124,13 +130,11 @@ func (c *HistoChunk) Appender() (Appender, error) {
}
a := &histoAppender{
c: c,
b: &c.b,
schema: c.schema,
posSpans: c.posSpans,
negSpans: c.negSpans,
schema: it.schema,
posSpans: it.posSpans,
negSpans: it.negSpans,
t: it.t,
cnt: it.cnt,
zcnt: it.zcnt,
@ -154,8 +158,16 @@ func (c *HistoChunk) Appender() (Appender, error) {
return a, nil
}
// TODO fix this
func countSpans(spans []histogram.Span) int {
var cnt int
for _, s := range spans {
cnt += int(s.Length)
}
return cnt
}
func (c *HistoChunk) iterator(it Iterator) *histoIterator {
// TODO fix this. this is taken from xor.go
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
// When using striped locks to guard access to chunks, probably yes.
// Could only copy data if the chunk is not completed yet.
@ -164,29 +176,12 @@ func (c *HistoChunk) iterator(it Iterator) *histoIterator {
// return histoIter
//}
var numPosBuckets, numNegBuckets int
for _, s := range c.posSpans {
numPosBuckets += int(s.Length)
}
for _, s := range c.negSpans {
numNegBuckets += int(s.Length)
}
return &histoIterator{
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
br: newBReader(c.b.bytes()[2:]),
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
t: math.MinInt64,
schema: c.schema,
posSpans: c.posSpans,
negSpans: c.negSpans,
posbuckets: make([]int64, numPosBuckets),
negbuckets: make([]int64, numNegBuckets),
posbucketsDelta: make([]int64, numPosBuckets),
negbucketsDelta: make([]int64, numNegBuckets),
}
}
@ -196,8 +191,6 @@ func (c *HistoChunk) Iterator(it Iterator) Iterator {
}
type histoAppender struct {
c *HistoChunk // this is such that during the first append we can set the metadata on the chunk. not sure if that's how it should work
b *bstream
// Meta
@ -233,27 +226,6 @@ func putUvarint(b *bstream, buf []byte, x uint64) {
}
}
// we use this for millisec timestamps and all counts
// for now this is copied from xor.go - we will probably want to be more conservative (use fewer bits for small values) - can be tweaked later
func putDod(b *bstream, dod int64) {
switch {
case dod == 0:
b.writeBit(zero)
case bitRange(dod, 14):
b.writeBits(0x02, 2) // '10'
b.writeBits(uint64(dod), 14)
case bitRange(dod, 17):
b.writeBits(0x06, 3) // '110'
b.writeBits(uint64(dod), 17)
case bitRange(dod, 20):
b.writeBits(0x0e, 4) // '1110'
b.writeBits(uint64(dod), 20)
default:
b.writeBits(0x0f, 4) // '1111'
b.writeBits(uint64(dod), 64)
}
}
func (a *histoAppender) Append(int64, float64) {
panic("cannot call histoAppender.Append().")
}
@ -265,14 +237,19 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
num := binary.BigEndian.Uint16(a.b.bytes())
if num == 0 {
// the first append gets the privilege to dictate the metadata, on both the appender and the chunk
// TODO we should probably not reach back into the chunk here. should metadata be set when we create the chunk?
a.c.schema = h.Schema
a.c.posSpans, a.c.negSpans = h.PositiveSpans, h.NegativeSpans
// the first append gets the privilege to dictate the metadata
// but it's also responsible for encoding it into the chunk!
writeHistoChunkMeta(a.b, h.Schema, h.PositiveSpans, h.NegativeSpans)
a.schema = h.Schema
a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans
numPosBuckets, numNegBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans)
a.posbuckets = make([]int64, numPosBuckets)
a.negbuckets = make([]int64, numNegBuckets)
a.posbucketsDelta = make([]int64, numPosBuckets)
a.negbucketsDelta = make([]int64, numNegBuckets)
// now store actual data
putVarint(a.b, a.buf64, t)
putUvarint(a.b, a.buf64, h.Count)
putUvarint(a.b, a.buf64, h.ZeroCount)
@ -313,22 +290,22 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
cntDod := cntDelta - a.cntDelta
zcntDod := zcntDelta - a.zcntDelta
putDod(a.b, tDod)
putDod(a.b, cntDod)
putDod(a.b, zcntDod)
putInt64VBBucket(a.b, tDod)
putInt64VBBucket(a.b, cntDod)
putInt64VBBucket(a.b, zcntDod)
a.writeSumDelta(h.Sum)
for i, buck := range h.PositiveBuckets {
delta := buck - a.posbuckets[i]
dod := delta - a.posbucketsDelta[i]
putDod(a.b, dod)
putInt64VBBucket(a.b, dod)
a.posbucketsDelta[i] = delta
}
for i, buck := range h.NegativeBuckets {
delta := buck - a.negbuckets[i]
dod := delta - a.negbucketsDelta[i]
putDod(a.b, dod)
putInt64VBBucket(a.b, dod)
a.negbucketsDelta[i] = delta
}
}
@ -475,6 +452,23 @@ func (it *histoIterator) Next() bool {
}
if it.numRead == 0 {
// first read is responsible for reading chunk metadata and initializing fields that depend on it
schema, posSpans, negSpans, err := readHistoChunkMeta(&it.br)
if err != nil {
it.err = err
return false
}
it.schema = schema
it.posSpans, it.negSpans = posSpans, negSpans
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans)
it.posbuckets = make([]int64, numPosBuckets)
it.negbuckets = make([]int64, numNegBuckets)
it.posbucketsDelta = make([]int64, numPosBuckets)
it.negbucketsDelta = make([]int64, numNegBuckets)
// now read actual data
t, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
@ -577,45 +571,50 @@ func (it *histoIterator) Next() bool {
return true
}
tDod, ok := it.readDod()
if !ok {
return ok
tDod, err := readInt64VBBucket(&it.br)
if err != nil {
it.err = err
return false
}
it.tDelta = it.tDelta + tDod
it.t += it.tDelta
cntDod, ok := it.readDod()
if !ok {
return ok
cntDod, err := readInt64VBBucket(&it.br)
if err != nil {
it.err = err
return false
}
it.cntDelta = it.cntDelta + cntDod
it.cnt = uint64(int64(it.cnt) + it.cntDelta)
zcntDod, ok := it.readDod()
if !ok {
return ok
zcntDod, err := readInt64VBBucket(&it.br)
if err != nil {
it.err = err
return false
}
it.zcntDelta = it.zcntDelta + zcntDod
it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta)
ok = it.readSum()
ok := it.readSum()
if !ok {
return false
}
for i := range it.posbuckets {
dod, ok := it.readDod()
if !ok {
return ok
dod, err := readInt64VBBucket(&it.br)
if err != nil {
it.err = err
return false
}
it.posbucketsDelta[i] = it.posbucketsDelta[i] + dod
it.posbuckets[i] = it.posbuckets[i] + it.posbucketsDelta[i]
}
for i := range it.negbuckets {
dod, ok := it.readDod()
if !ok {
return ok
dod, err := readInt64VBBucket(&it.br)
if err != nil {
it.err = err
return false
}
it.negbucketsDelta[i] = it.negbucketsDelta[i] + dod
it.negbuckets[i] = it.negbuckets[i] + it.negbucketsDelta[i]
@ -624,66 +623,6 @@ func (it *histoIterator) Next() bool {
return true
}
func (it *histoIterator) readDod() (int64, bool) {
var d byte
// read delta-of-delta
for i := 0; i < 4; i++ {
d <<= 1
bit, err := it.br.readBitFast()
if err != nil {
bit, err = it.br.readBit()
}
if err != nil {
it.err = err
return 0, false
}
if bit == zero {
break
}
d |= 1
}
var sz uint8
var dod int64
switch d {
case 0x00:
// dod == 0
case 0x02:
sz = 14
case 0x06:
sz = 17
case 0x0e:
sz = 20
case 0x0f:
// Do not use fast because it's very unlikely it will succeed.
bits, err := it.br.readBits(64)
if err != nil {
it.err = err
return 0, false
}
dod = int64(bits)
}
if sz != 0 {
bits, err := it.br.readBitsFast(sz)
if err != nil {
bits, err = it.br.readBits(sz)
}
if err != nil {
it.err = err
return 0, false
}
if bits > (1 << (sz - 1)) {
// or something
bits = bits - (1 << sz)
}
dod = int64(bits)
}
return dod, true
}
func (it *histoIterator) readSum() bool {
bit, err := it.br.readBitFast()
if err != nil {

View file

@ -0,0 +1,82 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The code in this file was largely written by Damian Gryski as part of
// https://github.com/dgryski/go-tsz and published under the license below.
// It was modified to accommodate reading from byte slices without modifying
// the underlying bytes, which would panic when reading from mmap'd
// read-only byte slices.
package chunkenc
import "github.com/prometheus/prometheus/pkg/histogram"
func writeHistoChunkMeta(b *bstream, schema int32, posSpans, negSpans []histogram.Span) {
putInt64VBBucket(b, int64(schema))
putHistoChunkMetaSpans(b, posSpans)
putHistoChunkMetaSpans(b, negSpans)
}
func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) {
putInt64VBBucket(b, int64(len(spans)))
for _, s := range spans {
putInt64VBBucket(b, int64(s.Length))
putInt64VBBucket(b, int64(s.Offset))
}
}
func readHistoChunkMeta(b *bstreamReader) (int32, []histogram.Span, []histogram.Span, error) {
v, err := readInt64VBBucket(b)
if err != nil {
return 0, nil, nil, err
}
schema := int32(v)
posSpans, err := readHistoChunkMetaSpans(b)
if err != nil {
return 0, nil, nil, err
}
negSpans, err := readHistoChunkMetaSpans(b)
if err != nil {
return 0, nil, nil, err
}
return schema, posSpans, negSpans, nil
}
func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) {
var spans []histogram.Span
num, err := readInt64VBBucket(b)
if err != nil {
return nil, err
}
for i := 0; i < int(num); i++ {
length, err := readInt64VBBucket(b)
if err != nil {
return nil, err
}
offset, err := readInt64VBBucket(b)
if err != nil {
return nil, err
}
spans = append(spans, histogram.Span{
Length: uint32(length),
Offset: int32(offset),
})
}
return spans, nil
}

View file

@ -47,8 +47,8 @@ func TestHistoChunkSameBuckets(t *testing.T) {
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
NegativeSpans: []histogram.Span{},
PositiveBuckets: []int64{1, 1, -1, 0},
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
NegativeSpans: nil,
NegativeBuckets: []int64{},
}
@ -59,17 +59,35 @@ func TestHistoChunkSameBuckets(t *testing.T) {
{t: ts, h: h},
}
// TODO add an update
// h.Count = 9
// h.Sum = 61
// add an updated histogram
// TODO add update with new appender
// Start with a new appender every 10th sample. This emulates starting
// appending to a partially filled chunk.
// app, err = c.Appender()
// require.NoError(t, err)
ts += 16
h.Count += 9
h.ZeroCount++
h.Sum = 24.4
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
// app.Append(ts, v)
app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h})
require.Equal(t, c.NumSamples(), 2)
// add update with new appender
app, err = c.Appender()
require.NoError(t, err)
require.Equal(t, c.NumSamples(), 2)
ts += 14
h.Count += 13
h.ZeroCount += 2
h.Sum = 24.4
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h})
require.Equal(t, c.NumSamples(), 3)
// 1. Expand iterator in simple case.
it1 := c.iterator(nil)
@ -77,7 +95,7 @@ func TestHistoChunkSameBuckets(t *testing.T) {
var res1 []res
for it1.Next() {
ts, h := it1.AtHistogram()
res1 = append(res1, res{t: ts, h: h})
res1 = append(res1, res{t: ts, h: h.Copy()})
}
require.NoError(t, it1.Err())
require.Equal(t, exp, res1)

View file

@ -0,0 +1,128 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The code in this file was largely written by Damian Gryski as part of
// https://github.com/dgryski/go-tsz and published under the license below.
// It was modified to accommodate reading from byte slices without modifying
// the underlying bytes, which would panic when reading from mmap'd
// read-only byte slices.
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
// All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package chunkenc
// putInt64VBBucket writes an int64 using varbit optimized for SHS buckets
// note: we could improve this further: each branch doesn't need to support any values of any of the prior branches, so we can expand the range of each branch. do more with fewer bits
func putInt64VBBucket(b *bstream, val int64) {
switch {
case val == 0:
b.writeBit(zero)
case bitRange(val, 3): // -3 <= val <= 4
b.writeBits(0x02, 2) // '10'
b.writeBits(uint64(val), 3)
case bitRange(val, 6): // -31 <= val <= 32
b.writeBits(0x06, 3) // '110'
b.writeBits(uint64(val), 6)
case bitRange(val, 9): // -255 <= val <= 256
b.writeBits(0x0e, 4) // '1110'
b.writeBits(uint64(val), 9)
case bitRange(val, 12): // -2047 <= val <= 2048
b.writeBits(0x1e, 5) // '11110'
b.writeBits(uint64(val), 12)
default:
b.writeBits(0x3e, 5) // '11111'
b.writeBits(uint64(val), 64)
}
}
// readInt64VBBucket reads an int64 using varbit optimized for SHS buckets
func readInt64VBBucket(b *bstreamReader) (int64, error) {
var d byte
for i := 0; i < 5; i++ {
d <<= 1
bit, err := b.readBitFast()
if err != nil {
bit, err = b.readBit()
}
if err != nil {
return 0, err
}
if bit == zero {
break
}
d |= 1
}
var val int64
var sz uint8
switch d {
case 0x00:
// val == 0
case 0x02: // '10'
sz = 3
case 0x06: // '110'
sz = 6
case 0x0e: // '1110'
sz = 9
case 0x1e: // '11110'
sz = 12
case 0x3e: // '11111'
// Do not use fast because it's very unlikely it will succeed.
bits, err := b.readBits(64)
if err != nil {
return 0, err
}
val = int64(bits)
}
if sz != 0 {
bits, err := b.readBitsFast(sz)
if err != nil {
bits, err = b.readBits(sz)
}
if err != nil {
return 0, err
}
if bits > (1 << (sz - 1)) {
// or something
bits = bits - (1 << sz)
}
val = int64(bits)
}
return val, nil
}

View file

@ -33,3 +33,17 @@ in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).
│ len <uvarint> │ encoding <1 byte> │ data <bytes> │ CRC32 <4 byte>
└───────────────┴───────────────────┴──────────────┴────────────────┘
```
## Histogram chunk
```
┌──────────────┬─────────────────┬──────────────────────────┬──────────────────────────┬──────────────┐
│ len <uint16> │ schema <varint> │ pos-spans <span-section> │ neg-spans <span-section> │ data <bytes>
└──────────────┴─────────────────┴──────────────────────────┴──────────────────────────┴──────────────┘
span-section:
┌──────────────┬──────────────────┬──────────────────┬────────────┐
│ len <varint> │ length1 <varint> │ offset1 <varint> │ length2... │
└──────────────┴──────────────────┴──────────────────┴────────────┘
```