From 58917d1b76a833c54ebedb1fcda3122db22b3e6a Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 29 Jun 2021 16:57:59 +0300 Subject: [PATCH] sparsehistogram: integer types and timestamp separation (#9014) * integer types and timestamp separation 1) unify types to int64. as suggested by beorn. we want to support counters going down (resets) even if we plan to create new chunks for now, in that case 2) histogram type doesn't know its own timestamp. include it separately in appending and iteration Signed-off-by: Dieter Plaetinck * correction: count and zeroCount to remain unsigned to make api more resilient and that's what we use in protobuf anyway Signed-off-by: Dieter Plaetinck * temp hack. Ganesh will fix Signed-off-by: Dieter Plaetinck --- pkg/histogram/sparse_histogram.go | 1 - tsdb/chunkenc/chunk.go | 2 +- tsdb/chunkenc/histo.go | 67 +++++++++++++++---------------- tsdb/chunkenc/histo_test.go | 21 +++++++--- tsdb/chunkenc/xor.go | 2 +- tsdb/head.go | 2 +- 6 files changed, 50 insertions(+), 45 deletions(-) diff --git a/pkg/histogram/sparse_histogram.go b/pkg/histogram/sparse_histogram.go index 6ea1cb76e0..0e4df93725 100644 --- a/pkg/histogram/sparse_histogram.go +++ b/pkg/histogram/sparse_histogram.go @@ -14,7 +14,6 @@ package histogram type SparseHistogram struct { - Ts int64 Count, ZeroCount uint64 Sum, ZeroThreshold float64 Schema int32 diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 0eddb4ea5b..63d3a61111 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -73,7 +73,7 @@ type Chunk interface { // Appender adds sample pairs to a chunk. type Appender interface { Append(int64, float64) - AppendHistogram(h histogram.SparseHistogram) + AppendHistogram(t int64, h histogram.SparseHistogram) } // Iterator is a simple iterator that can only get the next value. diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index d5c6d1afb9..08f2070e64 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -206,9 +206,10 @@ type histoAppender struct { posSpans, negSpans []histogram.Span // for the fields that are tracked as dod's + // note that we expect to handle negative deltas (e.g. resets) by creating new chunks, we still want to support it in general hence signed integer types t int64 cnt, zcnt uint64 - tDelta, cntDelta, zcntDelta uint64 + tDelta, cntDelta, zcntDelta int64 posbuckets, negbuckets []int64 posbucketsDelta, negbucketsDelta []int64 @@ -260,8 +261,8 @@ func (a *histoAppender) Append(int64, float64) { // AppendHistogram appends a SparseHistogram to the chunk // we assume the histogram is properly structured. E.g. that the number pos/neg buckets used corresponds to the number conveyed by the pos/neg span structures -func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { - var tDelta, cntDelta, zcntDelta uint64 +func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { + var tDelta, cntDelta, zcntDelta int64 num := binary.BigEndian.Uint16(a.b.bytes()) if num == 0 { @@ -273,7 +274,7 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { a.schema = h.Schema a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans - putVarint(a.b, a.buf64, h.Ts) + putVarint(a.b, a.buf64, t) putUvarint(a.b, a.buf64, h.Count) putUvarint(a.b, a.buf64, h.ZeroCount) a.b.writeBits(math.Float64bits(h.Sum), 64) @@ -284,16 +285,13 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { putVarint(a.b, a.buf64, buck) } } else if num == 1 { - tDelta = uint64(h.Ts - a.t) + tDelta = t - a.t + cntDelta = int64(h.Count) - int64(a.cnt) + zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) - // WARNING: we assume all counts go up. what guarantee do we have this is true? uint may underflow if not. - - cntDelta = h.Count - a.cnt - zcntDelta = h.ZeroCount - a.zcnt - - putUvarint(a.b, a.buf64, tDelta) - putUvarint(a.b, a.buf64, cntDelta) - putUvarint(a.b, a.buf64, zcntDelta) + putVarint(a.b, a.buf64, tDelta) + putVarint(a.b, a.buf64, cntDelta) + putVarint(a.b, a.buf64, zcntDelta) a.writeSumDelta(h.Sum) @@ -308,13 +306,13 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { a.negbucketsDelta[i] = delta } } else { - tDelta = uint64(h.Ts - a.t) - cntDelta = h.Count - a.cnt - zcntDelta = h.ZeroCount - a.zcnt + tDelta = t - a.t + cntDelta = int64(h.Count) - int64(a.cnt) + zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) - tDod := int64(tDelta - a.tDelta) - cntDod := int64(cntDelta - a.cntDelta) - zcntDod := int64(zcntDelta - a.zcntDelta) + tDod := tDelta - a.tDelta + cntDod := cntDelta - a.cntDelta + zcntDod := zcntDelta - a.zcntDelta putDod(a.b, tDod) putDod(a.b, cntDod) @@ -338,7 +336,7 @@ func (a *histoAppender) AppendHistogram(h histogram.SparseHistogram) { binary.BigEndian.PutUint16(a.b.bytes(), num+1) - a.t = h.Ts + a.t = t a.cnt = h.Count a.zcnt = h.ZeroCount a.tDelta = tDelta @@ -399,7 +397,7 @@ type histoIterator struct { // for the fields that are tracked as dod's t int64 cnt, zcnt uint64 - tDelta, cntDelta, zcntDelta uint64 + tDelta, cntDelta, zcntDelta int64 posbuckets, negbuckets []int64 posbucketsDelta, negbucketsDelta []int64 @@ -424,9 +422,8 @@ func (it *histoIterator) Seek(t int64) bool { } return true } -func (it *histoIterator) At() (h histogram.SparseHistogram) { - return histogram.SparseHistogram{ - Ts: it.t, +func (it *histoIterator) At() (t int64, h histogram.SparseHistogram) { + return it.t, histogram.SparseHistogram{ Count: it.cnt, ZeroCount: it.zcnt, Sum: it.sum, @@ -524,7 +521,7 @@ func (it *histoIterator) Next() bool { } if it.numRead == 1 { - tDelta, err := binary.ReadUvarint(&it.br) + tDelta, err := binary.ReadVarint(&it.br) if err != nil { it.err = err return false @@ -532,21 +529,21 @@ func (it *histoIterator) Next() bool { it.tDelta = tDelta it.t += int64(it.tDelta) - cntDelta, err := binary.ReadUvarint(&it.br) + cntDelta, err := binary.ReadVarint(&it.br) if err != nil { it.err = err return false } it.cntDelta = cntDelta - it.cnt += it.cntDelta + it.cnt = uint64(int64(it.cnt) + it.cntDelta) - zcntDelta, err := binary.ReadUvarint(&it.br) + zcntDelta, err := binary.ReadVarint(&it.br) if err != nil { it.err = err return false } it.zcntDelta = zcntDelta - it.zcnt += it.zcntDelta + it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta) ok := it.readSum() if !ok { @@ -580,22 +577,22 @@ func (it *histoIterator) Next() bool { if !ok { return ok } - it.tDelta = uint64(int64(it.tDelta) + tDod) - it.t += int64(it.tDelta) + it.tDelta = it.tDelta + tDod + it.t += it.tDelta cntDod, ok := it.readDod() if !ok { return ok } - it.cntDelta = uint64(int64(it.cntDelta) + cntDod) - it.cnt += it.cntDelta + it.cntDelta = it.cntDelta + cntDod + it.cnt = uint64(int64(it.cnt) + it.cntDelta) zcntDod, ok := it.readDod() if !ok { return ok } - it.zcntDelta = uint64(int64(it.zcntDelta) + zcntDod) - it.zcnt += it.zcntDelta + it.zcntDelta = it.zcntDelta + zcntDod + it.zcnt = uint64(int64(it.zcnt) + it.zcntDelta) ok = it.readSum() if !ok { diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index c5e3b5b2be..907756a68c 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -24,12 +24,20 @@ func TestHistoChunkSameBuckets(t *testing.T) { c := NewHistoChunk() + type res struct { + t int64 + h histogram.SparseHistogram + } + + // create fresh appender and add the first histogram + app, err := c.Appender() require.NoError(t, err) require.Equal(t, c.NumSamples(), 0) + ts := int64(1234567890) + h := histogram.SparseHistogram{ - Ts: 1234567890, Count: 5, ZeroCount: 2, Sum: 18.4, @@ -44,11 +52,11 @@ func TestHistoChunkSameBuckets(t *testing.T) { NegativeBuckets: []int64{}, } - app.AppendHistogram(h) + app.AppendHistogram(ts, h) require.Equal(t, c.NumSamples(), 1) - exp := []histogram.SparseHistogram{ - h, + exp := []res{ + {t: ts, h: h}, } // TODO add an update @@ -66,9 +74,10 @@ func TestHistoChunkSameBuckets(t *testing.T) { // 1. Expand iterator in simple case. it1 := c.iterator(nil) require.NoError(t, it1.Err()) - var res1 []histogram.SparseHistogram + var res1 []res for it1.Next() { - res1 = append(res1, it1.At()) + ts, h := it1.At() + res1 = append(res1, res{t: ts, h: h}) } require.NoError(t, it1.Err()) require.Equal(t, exp, res1) diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index b46e888547..d1dc09f8f7 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -149,7 +149,7 @@ type xorAppender struct { trailing uint8 } -func (a *xorAppender) AppendHistogram(h histogram.SparseHistogram) { +func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { panic("cannot call xorAppender.AppendHistogram().") } diff --git a/tsdb/head.go b/tsdb/head.go index 296fa56c04..42e4f86f08 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1199,7 +1199,7 @@ func (a *initAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram if a.app != nil { return a.app.AppendHistogram(ref, l, sh) } - a.head.initTime(sh.Ts) + //a.head.initTime(sh.Ts) FIXME(ganesh) a.app = a.head.appender() return a.app.AppendHistogram(ref, l, sh)