From 19e98e5469c15c15e182f3a5ab8ceeeca43a9ee3 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Fri, 6 Aug 2021 18:08:41 +0530 Subject: [PATCH] Support storing the zero threshold in the histogram chunk (#9165) Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/histo.go | 16 +++++++----- tsdb/chunkenc/histo_meta.go | 23 +++++++++++------ tsdb/chunkenc/histo_test.go | 20 +++++++-------- tsdb/chunkenc/varbit_buckets.go | 44 ++++++++++++++++++++++++--------- 4 files changed, 68 insertions(+), 35 deletions(-) diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index a3a4abcf8d..039bc4dd3e 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -97,7 +97,7 @@ func (c *HistoChunk) NumSamples() int { // Meta returns the histogram metadata. // callers may only call this on chunks that have at least one sample -func (c *HistoChunk) Meta() (int32, []histogram.Span, []histogram.Span, error) { +func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, error) { if c.NumSamples() == 0 { panic("HistoChunk.Meta() called on an empty chunk") } @@ -131,6 +131,7 @@ func (c *HistoChunk) Appender() (Appender, error) { b: &c.b, schema: it.schema, + zeroThreshold: it.zeroThreshold, posSpans: it.posSpans, negSpans: it.negSpans, t: it.t, @@ -199,6 +200,7 @@ type HistoAppender struct { // Metadata: schema int32 + zeroThreshold float64 posSpans, negSpans []histogram.Span // For the fields that are tracked as dod's. Note that we expect to @@ -245,8 +247,7 @@ func (a *HistoAppender) Append(int64, float64) {} // * the zerobucket threshold has changed // * any buckets disappeared func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) { - // TODO zerothreshold - if h.Schema != a.schema { + if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold { return nil, nil, false } posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans) @@ -273,8 +274,9 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { // the first append gets the privilege to dictate the metadata // but it's also responsible for encoding it into the chunk! - writeHistoChunkMeta(a.b, h.Schema, h.PositiveSpans, h.NegativeSpans) + writeHistoChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) a.schema = h.Schema + a.zeroThreshold = h.ZeroThreshold a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans numPosBuckets, numNegBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) a.posbuckets = make([]int64, numPosBuckets) @@ -442,6 +444,7 @@ type histoIterator struct { // Metadata: schema int32 + zeroThreshold float64 posSpans, negSpans []histogram.Span // For the fields that are tracked as dod's. @@ -486,7 +489,7 @@ func (it *histoIterator) AtHistogram() (int64, histogram.SparseHistogram) { Count: it.cnt, ZeroCount: it.zcnt, Sum: it.sum, - ZeroThreshold: 0, // TODO + ZeroThreshold: it.zeroThreshold, Schema: it.schema, PositiveSpans: it.posSpans, NegativeSpans: it.negSpans, @@ -532,12 +535,13 @@ func (it *histoIterator) Next() bool { if it.numRead == 0 { // first read is responsible for reading chunk metadata and initializing fields that depend on it - schema, posSpans, negSpans, err := readHistoChunkMeta(&it.br) + schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) if err != nil { it.err = err return false } it.schema = schema + it.zeroThreshold = zeroThreshold it.posSpans, it.negSpans = posSpans, negSpans numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) it.posbuckets = make([]int64, numPosBuckets) diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index 3010cfa394..e470fd8469 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -13,10 +13,13 @@ package chunkenc -import "github.com/prometheus/prometheus/pkg/histogram" +import ( + "github.com/prometheus/prometheus/pkg/histogram" +) -func writeHistoChunkMeta(b *bstream, schema int32, posSpans, negSpans []histogram.Span) { +func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { putInt64VBBucket(b, int64(schema)) + putFloat64VBBucket(b, zeroThreshold) putHistoChunkMetaSpans(b, posSpans) putHistoChunkMetaSpans(b, negSpans) } @@ -29,25 +32,29 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) { } } -func readHistoChunkMeta(b *bstreamReader) (int32, []histogram.Span, []histogram.Span, error) { - +func readHistoChunkMeta(b *bstreamReader) (int32, float64, []histogram.Span, []histogram.Span, error) { v, err := readInt64VBBucket(b) if err != nil { - return 0, nil, nil, err + return 0, 0, nil, nil, err } schema := int32(v) + zeroThreshold, err := readFloat64VBBucket(b) + if err != nil { + return 0, 0, nil, nil, err + } + posSpans, err := readHistoChunkMetaSpans(b) if err != nil { - return 0, nil, nil, err + return 0, 0, nil, nil, err } negSpans, err := readHistoChunkMetaSpans(b) if err != nil { - return 0, nil, nil, err + return 0, 0, nil, nil, err } - return schema, posSpans, negSpans, nil + return schema, zeroThreshold, posSpans, negSpans, nil } func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index ef7e518355..5a6e657ca8 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -31,11 +31,11 @@ func TestHistoChunkSameBuckets(t *testing.T) { ts := int64(1234567890) h := histogram.SparseHistogram{ - Count: 5, - ZeroCount: 2, - Sum: 18.4, - //ZeroThreshold: 1, TODO - Schema: 1, + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-100, + Schema: 1, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 2}, {Offset: 1, Length: 2}, @@ -129,11 +129,11 @@ func TestHistoChunkBucketChanges(t *testing.T) { ts1 := int64(1234567890) h1 := histogram.SparseHistogram{ - Count: 5, - ZeroCount: 2, - Sum: 18.4, - //ZeroThreshold: 1, TODO - Schema: 1, + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 2}, {Offset: 2, Length: 1}, diff --git a/tsdb/chunkenc/varbit_buckets.go b/tsdb/chunkenc/varbit_buckets.go index 80cbdcdd60..5bae350f6d 100644 --- a/tsdb/chunkenc/varbit_buckets.go +++ b/tsdb/chunkenc/varbit_buckets.go @@ -43,6 +43,28 @@ package chunkenc +import ( + "math" +) + +// putFloat64VBBucket writes a float64 using varbit optimized for SHS buckets. +// It does so by converting the underlying bits into an int64. +func putFloat64VBBucket(b *bstream, val float64) { + // TODO: Since this is used for the zero threshold, this almost always goes into the default + // bit range (i.e. using 5+64 bits). So we can consider skipping `putInt64VBBucket` and directly + // write the float and save 5 bits here. + putInt64VBBucket(b, int64(math.Float64bits(val))) +} + +// readFloat64VBBucket reads a float64 using varbit optimized for SHS buckets +func readFloat64VBBucket(b *bstreamReader) (float64, error) { + val, err := readInt64VBBucket(b) + if err != nil { + return 0, err + } + return math.Float64frombits(uint64(val)), nil +} + // putInt64VBBucket writes an int64 using varbit optimized for SHS buckets. // // TODO(Dieterbe): We could improve this further: Each branch doesn't need to @@ -55,19 +77,19 @@ func putInt64VBBucket(b *bstream, val int64) { case val == 0: b.writeBit(zero) case bitRange(val, 3): // -3 <= val <= 4 - b.writeBits(0x02, 2) // '10' + b.writeBits(0b10, 2) b.writeBits(uint64(val), 3) case bitRange(val, 6): // -31 <= val <= 32 - b.writeBits(0x06, 3) // '110' + b.writeBits(0b110, 3) b.writeBits(uint64(val), 6) case bitRange(val, 9): // -255 <= val <= 256 - b.writeBits(0x0e, 4) // '1110' + b.writeBits(0b1110, 4) b.writeBits(uint64(val), 9) case bitRange(val, 12): // -2047 <= val <= 2048 - b.writeBits(0x1e, 5) // '11110' + b.writeBits(0b11110, 5) b.writeBits(uint64(val), 12) default: - b.writeBits(0x3e, 5) // '11111' + b.writeBits(0b11111, 5) b.writeBits(uint64(val), 64) } } @@ -94,17 +116,17 @@ func readInt64VBBucket(b *bstreamReader) (int64, error) { var sz uint8 switch d { - case 0x00: + case 0b0: // val == 0 - case 0x02: // '10' + case 0b10: sz = 3 - case 0x06: // '110' + case 0b110: sz = 6 - case 0x0e: // '1110' + case 0b1110: sz = 9 - case 0x1e: // '11110' + case 0b11110: sz = 12 - case 0x3e: // '11111' + case 0b11111: // Do not use fast because it's very unlikely it will succeed. bits, err := b.readBits(64) if err != nil {