Add a NotCounterReset flag

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-10-06 16:02:19 +05:30
parent a280b6c2da
commit 175ef4ebcf
No known key found for this signature in database
GPG key ID: 0F8729A5EB59B965
7 changed files with 79 additions and 48 deletions

View file

@ -82,7 +82,7 @@ type Chunk interface {
// Appender adds sample pairs to a chunk. // Appender adds sample pairs to a chunk.
type Appender interface { type Appender interface {
Append(int64, float64) Append(int64, float64)
AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) AppendHistogram(t int64, h histogram.SparseHistogram)
} }
// Iterator is a simple iterator that can only get the next value. // Iterator is a simple iterator that can only get the next value.

View file

@ -98,7 +98,7 @@ func (c *HistoChunk) NumSamples() int {
// Meta returns the histogram metadata. // Meta returns the histogram metadata.
// callers may only call this on chunks that have at least one sample // callers may only call this on chunks that have at least one sample
func (c *HistoChunk) Meta() (bool, int32, float64, []histogram.Span, []histogram.Span, error) { func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, error) {
if c.NumSamples() == 0 { if c.NumSamples() == 0 {
panic("HistoChunk.Meta() called on an empty chunk") panic("HistoChunk.Meta() called on an empty chunk")
} }
@ -106,6 +106,18 @@ func (c *HistoChunk) Meta() (bool, int32, float64, []histogram.Span, []histogram
return readHistoChunkMeta(&b) return readHistoChunkMeta(&b)
} }
// SetCounterReset sets the counter reset flag to 1 if the passed argument is true, 0 otherwise.
func (c *HistoChunk) SetCounterReset(counterReset bool) {
bytes := c.Bytes()
header := bytes[2]
if counterReset {
header |= counterResetMask
} else if (header & counterResetMask) != 0 {
header ^= counterResetMask
}
bytes[2] = header
}
// CounterReset returns true if this new chunk was created because of a counter reset. // CounterReset returns true if this new chunk was created because of a counter reset.
func (c *HistoChunk) CounterReset() bool { func (c *HistoChunk) CounterReset() bool {
if c.NumSamples() == 0 { if c.NumSamples() == 0 {
@ -114,6 +126,27 @@ func (c *HistoChunk) CounterReset() bool {
return (c.Bytes()[2] & counterResetMask) != 0 return (c.Bytes()[2] & counterResetMask) != 0
} }
// SetNotCounterReset sets the "not counter reset" flag to 1 if the passed argument is true, 0 otherwise.
func (c *HistoChunk) SetNotCounterReset(notCounterReset bool) {
bytes := c.Bytes()
header := bytes[2]
if notCounterReset {
header |= notCounterResetMask
} else if (header & notCounterResetMask) != 0 {
header ^= notCounterResetMask
}
bytes[2] = header
}
// NotCounterReset returns true if this new chunk definitely did not have counter reset
// from the earlier chunk.
func (c *HistoChunk) NotCounterReset() bool {
if c.NumSamples() == 0 {
panic("HistoChunk.NotCounterReset() called on an empty chunk")
}
return (c.Bytes()[2] & notCounterResetMask) != 0
}
// Compact implements the Chunk interface. // Compact implements the Chunk interface.
func (c *HistoChunk) Compact() { func (c *HistoChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
@ -208,7 +241,6 @@ type HistoAppender struct {
b *bstream b *bstream
// Metadata: // Metadata:
counterReset bool
schema int32 schema int32
zeroThreshold float64 zeroThreshold float64
posSpans, negSpans []histogram.Span posSpans, negSpans []histogram.Span
@ -260,6 +292,7 @@ func (a *HistoAppender) Append(int64, float64) {}
// * the last sample in the chunk was stale while the current sample is not stale // * the last sample in the chunk was stale while the current sample is not stale
// It returns an additional boolean set to true if it is not appendable because of a counter reset. // It returns an additional boolean set to true if it is not appendable because of a counter reset.
// If the given sample is stale, it will always return true. // If the given sample is stale, it will always return true.
// If counterReset is true, okToAppend MUST be false.
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) (posInterjections []Interjection, negInterjections []Interjection, okToAppend bool, counterReset bool) { func (a *HistoAppender) Appendable(h histogram.SparseHistogram) (posInterjections []Interjection, negInterjections []Interjection, okToAppend bool, counterReset bool) {
if value.IsStaleNaN(h.Sum) { if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter. // This is a stale sample whose buckets and spans don't matter.
@ -380,7 +413,7 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans
// histogram is properly structured. E.g. that the number of pos/neg buckets // histogram is properly structured. E.g. that the number of pos/neg buckets
// used corresponds to the number conveyed by the pos/neg span structures. // used corresponds to the number conveyed by the pos/neg span structures.
// callers must call Appendable() first and act accordingly! // callers must call Appendable() first and act accordingly!
func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) { func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
var tDelta, cntDelta, zcntDelta int64 var tDelta, cntDelta, zcntDelta int64
num := binary.BigEndian.Uint16(a.b.bytes()) num := binary.BigEndian.Uint16(a.b.bytes())
@ -390,17 +423,12 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram, co
h = histogram.SparseHistogram{Sum: h.Sum} h = histogram.SparseHistogram{Sum: h.Sum}
} }
if num != 0 && counterReset {
panic("got counterReset=true for partially filled chunk in HistoAppender.AppendHistogram")
}
switch num { switch num {
case 0: case 0:
// the first append gets the privilege to dictate the metadata // the first append gets the privilege to dictate the metadata
// but it's also responsible for encoding it into the chunk! // but it's also responsible for encoding it into the chunk!
writeHistoChunkMeta(a.b, counterReset, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans) writeHistoChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans)
a.counterReset = true
a.schema = h.Schema a.schema = h.Schema
a.zeroThreshold = h.ZeroThreshold a.zeroThreshold = h.ZeroThreshold
a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans
@ -503,7 +531,8 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
// it again with the new span layout. This can probably be done in-place // it again with the new span layout. This can probably be done in-place
// by editing the chunk. But let's first see how expensive it is in the // by editing the chunk. But let's first see how expensive it is in the
// big picture. // big picture.
it := newHistoIterator(a.b.bytes()) byts := a.b.bytes()
it := newHistoIterator(byts)
hc := NewHistoChunk() hc := NewHistoChunk()
app, err := hc.Appender() app, err := hc.Appender()
if err != nil { if err != nil {
@ -511,7 +540,6 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
} }
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans)
counterReset := a.counterReset
for it.Next() { for it.Next() {
tOld, hOld := it.AtHistogram() tOld, hOld := it.AtHistogram()
@ -530,10 +558,12 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
if len(negInterjections) > 0 { if len(negInterjections) > 0 {
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negBuckets, negInterjections) hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negBuckets, negInterjections)
} }
app.AppendHistogram(tOld, hOld, counterReset) app.AppendHistogram(tOld, hOld)
counterReset = false // We need it only for the first sample.
} }
// Set the flags.
hc.SetCounterReset(byts[2]&counterResetMask != 0)
hc.SetNotCounterReset(byts[2]&notCounterResetMask != 0)
return hc, app return hc, app
} }
@ -674,7 +704,7 @@ func (it *histoIterator) Next() bool {
// first read is responsible for reading chunk metadata and initializing fields that depend on it // first read is responsible for reading chunk metadata and initializing fields that depend on it
// We give counter reset info at chunk level, hence we discard it here. // We give counter reset info at chunk level, hence we discard it here.
_, schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br) schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br)
if err != nil { if err != nil {
it.err = err it.err = err
return false return false

View file

@ -19,15 +19,10 @@ import (
const ( const (
counterResetMask = 0b10000000 counterResetMask = 0b10000000
notCounterResetMask = 0b01000000
) )
func writeHistoChunkMeta(b *bstream, counterReset bool, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) { func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) {
header := byte(0)
if counterReset {
header |= counterResetMask
}
b.bytes()[2] = header
putInt64VBBucket(b, int64(schema)) putInt64VBBucket(b, int64(schema))
putFloat64VBBucket(b, zeroThreshold) putFloat64VBBucket(b, zeroThreshold)
putHistoChunkMetaSpans(b, posSpans) putHistoChunkMetaSpans(b, posSpans)
@ -42,15 +37,12 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) {
} }
} }
func readHistoChunkMeta(b *bstreamReader) (counterReset bool, schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) { func readHistoChunkMeta(b *bstreamReader) (schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) {
var header byte _, err = b.ReadByte() // The header.
header, err = b.ReadByte()
if err != nil { if err != nil {
return return
} }
counterReset = (header & counterResetMask) != 0
v, err := readInt64VBBucket(b) v, err := readInt64VBBucket(b)
if err != nil { if err != nil {
return return

View file

@ -42,7 +42,7 @@ func TestHistoChunkSameBuckets(t *testing.T) {
}, },
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
} }
app.AppendHistogram(ts, h, false) app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h}) exp = append(exp, res{t: ts, h: h})
require.Equal(t, 1, c.NumSamples()) require.Equal(t, 1, c.NumSamples())
@ -52,7 +52,7 @@ func TestHistoChunkSameBuckets(t *testing.T) {
h.ZeroCount++ h.ZeroCount++
h.Sum = 24.4 h.Sum = 24.4
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
app.AppendHistogram(ts, h, false) app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h}) exp = append(exp, res{t: ts, h: h})
require.Equal(t, 2, c.NumSamples()) require.Equal(t, 2, c.NumSamples())
@ -65,7 +65,7 @@ func TestHistoChunkSameBuckets(t *testing.T) {
h.ZeroCount += 2 h.ZeroCount += 2
h.Sum = 24.4 h.Sum = 24.4
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
app.AppendHistogram(ts, h, false) app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h}) exp = append(exp, res{t: ts, h: h})
require.Equal(t, 3, c.NumSamples()) require.Equal(t, 3, c.NumSamples())
@ -142,7 +142,7 @@ func TestHistoChunkBucketChanges(t *testing.T) {
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
} }
app.AppendHistogram(ts1, h1, false) app.AppendHistogram(ts1, h1)
require.Equal(t, 1, c.NumSamples()) require.Equal(t, 1, c.NumSamples())
// Add a new histogram that has expanded buckets. // Add a new histogram that has expanded buckets.
@ -169,7 +169,7 @@ func TestHistoChunkBucketChanges(t *testing.T) {
require.True(t, ok) // Only new buckets came in. require.True(t, ok) // Only new buckets came in.
require.False(t, cr) require.False(t, cr)
c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
app.AppendHistogram(ts2, h2, false) app.AppendHistogram(ts2, h2)
require.Equal(t, 2, c.NumSamples()) require.Equal(t, 2, c.NumSamples())
@ -216,7 +216,7 @@ func TestHistoChunkAppendable(t *testing.T) {
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
} }
app.AppendHistogram(ts, h1, false) app.AppendHistogram(ts, h1)
require.Equal(t, 1, c.NumSamples()) require.Equal(t, 1, c.NumSamples())
{ // New histogram that has more buckets. { // New histogram that has more buckets.

View file

@ -150,7 +150,7 @@ type xorAppender struct {
trailing uint8 trailing uint8
} }
func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) { func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
//panic("cannot call xorAppender.AppendHistogram().") //panic("cannot call xorAppender.AppendHistogram().")
} }

View file

@ -611,10 +611,10 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
app, _ := s.app.(*chunkenc.HistoAppender) app, _ := s.app.(*chunkenc.HistoAppender)
var ( var (
posInterjections, negInterjections []chunkenc.Interjection posInterjections, negInterjections []chunkenc.Interjection
ok, counterReset bool okToAppend, counterReset bool
) )
if app != nil { if app != nil {
posInterjections, negInterjections, ok, counterReset = app.Appendable(sh) posInterjections, negInterjections, okToAppend, counterReset = app.Appendable(sh)
} }
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper)
@ -623,11 +623,11 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
} }
if !chunkCreated { if !chunkCreated {
// we have 3 cases here // We have 3 cases here
// !ok -> we need to cut a new chunk // !okToAppend -> we need to cut a new chunk
// ok but we have interjections -> existing chunk needs recoding before we can append our histogram // okToAppend but we have interjections -> existing chunk needs recoding before we can append our histogram
// ok and no interjections -> chunk is ready to support our histogram // okToAppend and no interjections -> chunk is ready to support our histogram
if !ok { if !okToAppend || counterReset {
c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper) c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper)
chunkCreated = true chunkCreated = true
} else if len(posInterjections) > 0 || len(negInterjections) > 0 { } else if len(posInterjections) > 0 || len(negInterjections) > 0 {
@ -642,7 +642,16 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
} }
} }
s.app.AppendHistogram(t, sh, counterReset) if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.HistoChunk)
if counterReset {
hc.SetCounterReset(true)
} else if okToAppend {
hc.SetNotCounterReset(true)
}
}
s.app.AppendHistogram(t, sh)
s.sparseHistogramSeries = true s.sparseHistogramSeries = true
c.maxTime = t c.maxTime = t

View file

@ -717,16 +717,16 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
h histogram.SparseHistogram h histogram.SparseHistogram
) )
if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS { if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS {
counterReset := false
if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistoChunk); ok { if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistoChunk); ok {
counterReset = hc.CounterReset() newChunk.(*chunkenc.HistoChunk).SetCounterReset(hc.CounterReset())
newChunk.(*chunkenc.HistoChunk).SetNotCounterReset(hc.NotCounterReset())
} }
t, h = p.currDelIter.AtHistogram() t, h = p.currDelIter.AtHistogram()
p.curr.MinTime = t p.curr.MinTime = t
app.AppendHistogram(t, h.Copy(), counterReset) app.AppendHistogram(t, h.Copy())
for p.currDelIter.Next() { for p.currDelIter.Next() {
t, h = p.currDelIter.AtHistogram() t, h = p.currDelIter.AtHistogram()
app.AppendHistogram(t, h.Copy(), false) app.AppendHistogram(t, h.Copy())
} }
} else { } else {
t, v = p.currDelIter.At() t, v = p.currDelIter.At()