mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Add info about counter resets in chunk meta
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
1dd22ed655
commit
eb9931e961
|
@ -82,7 +82,7 @@ type Chunk interface {
|
||||||
// Appender adds sample pairs to a chunk.
|
// Appender adds sample pairs to a chunk.
|
||||||
type Appender interface {
|
type Appender interface {
|
||||||
Append(int64, float64)
|
Append(int64, float64)
|
||||||
AppendHistogram(t int64, h histogram.SparseHistogram)
|
AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterator is a simple iterator that can only get the next value.
|
// Iterator is a simple iterator that can only get the next value.
|
||||||
|
|
|
@ -77,7 +77,7 @@ type HistoChunk struct {
|
||||||
|
|
||||||
// NewHistoChunk returns a new chunk with Histo encoding of the given size.
|
// NewHistoChunk returns a new chunk with Histo encoding of the given size.
|
||||||
func NewHistoChunk() *HistoChunk {
|
func NewHistoChunk() *HistoChunk {
|
||||||
b := make([]byte, 2, 128)
|
b := make([]byte, 3, 128)
|
||||||
return &HistoChunk{b: bstream{stream: b, count: 0}}
|
return &HistoChunk{b: bstream{stream: b, count: 0}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ func (c *HistoChunk) NumSamples() int {
|
||||||
|
|
||||||
// Meta returns the histogram metadata.
|
// Meta returns the histogram metadata.
|
||||||
// callers may only call this on chunks that have at least one sample
|
// callers may only call this on chunks that have at least one sample
|
||||||
func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, error) {
|
func (c *HistoChunk) Meta() (bool, int32, float64, []histogram.Span, []histogram.Span, error) {
|
||||||
if c.NumSamples() == 0 {
|
if c.NumSamples() == 0 {
|
||||||
panic("HistoChunk.Meta() called on an empty chunk")
|
panic("HistoChunk.Meta() called on an empty chunk")
|
||||||
}
|
}
|
||||||
|
@ -106,6 +106,14 @@ func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span,
|
||||||
return readHistoChunkMeta(&b)
|
return readHistoChunkMeta(&b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CounterReset returns true if this new chunk was created because of a counter reset.
|
||||||
|
func (c *HistoChunk) CounterReset() bool {
|
||||||
|
if c.NumSamples() == 0 {
|
||||||
|
panic("HistoChunk.CounterReset() called on an empty chunk")
|
||||||
|
}
|
||||||
|
return (c.Bytes()[2] & counterResetMask) != 0
|
||||||
|
}
|
||||||
|
|
||||||
// Compact implements the Chunk interface.
|
// Compact implements the Chunk interface.
|
||||||
func (c *HistoChunk) Compact() {
|
func (c *HistoChunk) Compact() {
|
||||||
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
|
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
|
||||||
|
@ -200,6 +208,7 @@ type HistoAppender struct {
|
||||||
b *bstream
|
b *bstream
|
||||||
|
|
||||||
// Metadata:
|
// Metadata:
|
||||||
|
counterReset bool
|
||||||
schema int32
|
schema int32
|
||||||
zeroThreshold float64
|
zeroThreshold float64
|
||||||
posSpans, negSpans []histogram.Span
|
posSpans, negSpans []histogram.Span
|
||||||
|
@ -249,42 +258,43 @@ func (a *HistoAppender) Append(int64, float64) {}
|
||||||
// * any buckets disappeared
|
// * any buckets disappeared
|
||||||
// * there was a counter reset in the count of observations or in any bucket, including the zero bucket
|
// * there was a counter reset in the count of observations or in any bucket, including the zero bucket
|
||||||
// * the last sample in the chunk was stale while the current sample is not stale
|
// * the last sample in the chunk was stale while the current sample is not stale
|
||||||
|
// It returns an additional boolean set to true if it is not appendable because of a counter reset.
|
||||||
// If the given sample is stale, it will always return true.
|
// If the given sample is stale, it will always return true.
|
||||||
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) {
|
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool, bool) {
|
||||||
if value.IsStaleNaN(h.Sum) {
|
if value.IsStaleNaN(h.Sum) {
|
||||||
// This is a stale sample whose buckets and spans don't matter.
|
// This is a stale sample whose buckets and spans don't matter.
|
||||||
return nil, nil, true
|
return nil, nil, true, false
|
||||||
}
|
}
|
||||||
if value.IsStaleNaN(a.sum) {
|
if value.IsStaleNaN(a.sum) {
|
||||||
// If the last sample was stale, then we can only accept stale samples in this chunk.
|
// If the last sample was stale, then we can only accept stale samples in this chunk.
|
||||||
return nil, nil, false
|
return nil, nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold {
|
if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold {
|
||||||
return nil, nil, false
|
return nil, nil, false, false
|
||||||
}
|
}
|
||||||
posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans)
|
posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, false
|
return nil, nil, false, false
|
||||||
}
|
}
|
||||||
negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans)
|
negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, false
|
return nil, nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
if h.Count < a.cnt || h.ZeroCount < a.zcnt {
|
if h.Count < a.cnt || h.ZeroCount < a.zcnt {
|
||||||
// There has been a counter reset.
|
// There has been a counter reset.
|
||||||
return nil, nil, false
|
return nil, nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) {
|
if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) {
|
||||||
return nil, nil, false
|
return nil, nil, false, true
|
||||||
}
|
}
|
||||||
if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) {
|
if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) {
|
||||||
return nil, nil, false
|
return nil, nil, false, true
|
||||||
}
|
}
|
||||||
|
|
||||||
return posInterjections, negInterjections, ok
|
return posInterjections, negInterjections, true, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// counterResetInAnyBucket returns true if there was a counter reset for any bucket.
|
// counterResetInAnyBucket returns true if there was a counter reset for any bucket.
|
||||||
|
@ -358,7 +368,7 @@ func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans
|
||||||
// histogram is properly structured. E.g. that the number of pos/neg buckets
|
// histogram is properly structured. E.g. that the number of pos/neg buckets
|
||||||
// used corresponds to the number conveyed by the pos/neg span structures.
|
// used corresponds to the number conveyed by the pos/neg span structures.
|
||||||
// callers must call Appendable() first and act accordingly!
|
// callers must call Appendable() first and act accordingly!
|
||||||
func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) {
|
||||||
var tDelta, cntDelta, zcntDelta int64
|
var tDelta, cntDelta, zcntDelta int64
|
||||||
num := binary.BigEndian.Uint16(a.b.bytes())
|
num := binary.BigEndian.Uint16(a.b.bytes())
|
||||||
|
|
||||||
|
@ -368,12 +378,17 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
||||||
h = histogram.SparseHistogram{Sum: h.Sum}
|
h = histogram.SparseHistogram{Sum: h.Sum}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if num != 0 && counterReset {
|
||||||
|
panic("got counterReset=true for partially filled chunk in HistoAppender.AppendHistogram")
|
||||||
|
}
|
||||||
|
|
||||||
switch num {
|
switch num {
|
||||||
case 0:
|
case 0:
|
||||||
// the first append gets the privilege to dictate the metadata
|
// the first append gets the privilege to dictate the metadata
|
||||||
// but it's also responsible for encoding it into the chunk!
|
// but it's also responsible for encoding it into the chunk!
|
||||||
|
|
||||||
writeHistoChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans)
|
writeHistoChunkMeta(a.b, counterReset, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans)
|
||||||
|
a.counterReset = true
|
||||||
a.schema = h.Schema
|
a.schema = h.Schema
|
||||||
a.zeroThreshold = h.ZeroThreshold
|
a.zeroThreshold = h.ZeroThreshold
|
||||||
a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans
|
a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans
|
||||||
|
@ -484,6 +499,7 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
|
||||||
}
|
}
|
||||||
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans)
|
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans)
|
||||||
|
|
||||||
|
counterReset := a.counterReset
|
||||||
for it.Next() {
|
for it.Next() {
|
||||||
tOld, hOld := it.AtHistogram()
|
tOld, hOld := it.AtHistogram()
|
||||||
|
|
||||||
|
@ -502,7 +518,9 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
|
||||||
if len(negInterjections) > 0 {
|
if len(negInterjections) > 0 {
|
||||||
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negBuckets, negInterjections)
|
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negBuckets, negInterjections)
|
||||||
}
|
}
|
||||||
app.AppendHistogram(tOld, hOld)
|
app.AppendHistogram(tOld, hOld, counterReset)
|
||||||
|
|
||||||
|
counterReset = false // We need it only for the first sample.
|
||||||
}
|
}
|
||||||
return hc, app
|
return hc, app
|
||||||
}
|
}
|
||||||
|
@ -643,7 +661,8 @@ func (it *histoIterator) Next() bool {
|
||||||
if it.numRead == 0 {
|
if it.numRead == 0 {
|
||||||
|
|
||||||
// first read is responsible for reading chunk metadata and initializing fields that depend on it
|
// first read is responsible for reading chunk metadata and initializing fields that depend on it
|
||||||
schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br)
|
// We give counter reset info at chunk level, hence we discard it here.
|
||||||
|
_, schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
it.err = err
|
it.err = err
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -17,7 +17,17 @@ import (
|
||||||
"github.com/prometheus/prometheus/pkg/histogram"
|
"github.com/prometheus/prometheus/pkg/histogram"
|
||||||
)
|
)
|
||||||
|
|
||||||
func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) {
|
const (
|
||||||
|
counterResetMask = 0b10000000
|
||||||
|
)
|
||||||
|
|
||||||
|
func writeHistoChunkMeta(b *bstream, counterReset bool, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) {
|
||||||
|
header := byte(0)
|
||||||
|
if counterReset {
|
||||||
|
header |= counterResetMask
|
||||||
|
}
|
||||||
|
b.bytes()[2] = header
|
||||||
|
|
||||||
putInt64VBBucket(b, int64(schema))
|
putInt64VBBucket(b, int64(schema))
|
||||||
putFloat64VBBucket(b, zeroThreshold)
|
putFloat64VBBucket(b, zeroThreshold)
|
||||||
putHistoChunkMetaSpans(b, posSpans)
|
putHistoChunkMetaSpans(b, posSpans)
|
||||||
|
@ -32,29 +42,36 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func readHistoChunkMeta(b *bstreamReader) (int32, float64, []histogram.Span, []histogram.Span, error) {
|
func readHistoChunkMeta(b *bstreamReader) (bool, int32, float64, []histogram.Span, []histogram.Span, error) {
|
||||||
|
header, err := b.ReadByte()
|
||||||
|
if err != nil {
|
||||||
|
return false, 0, 0, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
counterReset := (header & counterResetMask) != 0
|
||||||
|
|
||||||
v, err := readInt64VBBucket(b)
|
v, err := readInt64VBBucket(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, nil, nil, err
|
return false, 0, 0, nil, nil, err
|
||||||
}
|
}
|
||||||
schema := int32(v)
|
schema := int32(v)
|
||||||
|
|
||||||
zeroThreshold, err := readFloat64VBBucket(b)
|
zeroThreshold, err := readFloat64VBBucket(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, nil, nil, err
|
return false, 0, 0, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
posSpans, err := readHistoChunkMetaSpans(b)
|
posSpans, err := readHistoChunkMetaSpans(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, nil, nil, err
|
return false, 0, 0, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
negSpans, err := readHistoChunkMetaSpans(b)
|
negSpans, err := readHistoChunkMetaSpans(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, nil, nil, err
|
return false, 0, 0, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return schema, zeroThreshold, posSpans, negSpans, nil
|
return counterReset, schema, zeroThreshold, posSpans, negSpans, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) {
|
func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) {
|
||||||
|
|
|
@ -42,7 +42,7 @@ func TestHistoChunkSameBuckets(t *testing.T) {
|
||||||
},
|
},
|
||||||
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
|
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
|
||||||
}
|
}
|
||||||
app.AppendHistogram(ts, h)
|
app.AppendHistogram(ts, h, false)
|
||||||
exp = append(exp, res{t: ts, h: h})
|
exp = append(exp, res{t: ts, h: h})
|
||||||
require.Equal(t, 1, c.NumSamples())
|
require.Equal(t, 1, c.NumSamples())
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ func TestHistoChunkSameBuckets(t *testing.T) {
|
||||||
h.ZeroCount++
|
h.ZeroCount++
|
||||||
h.Sum = 24.4
|
h.Sum = 24.4
|
||||||
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
|
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
|
||||||
app.AppendHistogram(ts, h)
|
app.AppendHistogram(ts, h, false)
|
||||||
exp = append(exp, res{t: ts, h: h})
|
exp = append(exp, res{t: ts, h: h})
|
||||||
require.Equal(t, 2, c.NumSamples())
|
require.Equal(t, 2, c.NumSamples())
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ func TestHistoChunkSameBuckets(t *testing.T) {
|
||||||
h.ZeroCount += 2
|
h.ZeroCount += 2
|
||||||
h.Sum = 24.4
|
h.Sum = 24.4
|
||||||
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
|
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
|
||||||
app.AppendHistogram(ts, h)
|
app.AppendHistogram(ts, h, false)
|
||||||
exp = append(exp, res{t: ts, h: h})
|
exp = append(exp, res{t: ts, h: h})
|
||||||
require.Equal(t, 3, c.NumSamples())
|
require.Equal(t, 3, c.NumSamples())
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ func TestHistoChunkBucketChanges(t *testing.T) {
|
||||||
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
|
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
|
||||||
}
|
}
|
||||||
|
|
||||||
app.AppendHistogram(ts1, h1)
|
app.AppendHistogram(ts1, h1, false)
|
||||||
require.Equal(t, 1, c.NumSamples())
|
require.Equal(t, 1, c.NumSamples())
|
||||||
|
|
||||||
// Add a new histogram that has expanded buckets.
|
// Add a new histogram that has expanded buckets.
|
||||||
|
@ -163,12 +163,13 @@ func TestHistoChunkBucketChanges(t *testing.T) {
|
||||||
|
|
||||||
// This is how span changes will be handled.
|
// This is how span changes will be handled.
|
||||||
histoApp, _ := app.(*HistoAppender)
|
histoApp, _ := app.(*HistoAppender)
|
||||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
|
||||||
require.Greater(t, len(posInterjections), 0)
|
require.Greater(t, len(posInterjections), 0)
|
||||||
require.Equal(t, 0, len(negInterjections))
|
require.Equal(t, 0, len(negInterjections))
|
||||||
require.True(t, ok) // Only new buckets came in.
|
require.True(t, ok) // Only new buckets came in.
|
||||||
|
require.False(t, cr)
|
||||||
c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
|
c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
|
||||||
app.AppendHistogram(ts2, h2)
|
app.AppendHistogram(ts2, h2, false)
|
||||||
|
|
||||||
require.Equal(t, 2, c.NumSamples())
|
require.Equal(t, 2, c.NumSamples())
|
||||||
|
|
||||||
|
@ -215,7 +216,7 @@ func TestHistoChunkAppendable(t *testing.T) {
|
||||||
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
|
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
|
||||||
}
|
}
|
||||||
|
|
||||||
app.AppendHistogram(ts, h1)
|
app.AppendHistogram(ts, h1, false)
|
||||||
require.Equal(t, 1, c.NumSamples())
|
require.Equal(t, 1, c.NumSamples())
|
||||||
|
|
||||||
{ // New histogram that has more buckets.
|
{ // New histogram that has more buckets.
|
||||||
|
@ -234,10 +235,11 @@ func TestHistoChunkAppendable(t *testing.T) {
|
||||||
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
|
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
|
||||||
|
|
||||||
histoApp, _ := app.(*HistoAppender)
|
histoApp, _ := app.(*HistoAppender)
|
||||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
|
||||||
require.Greater(t, len(posInterjections), 0)
|
require.Greater(t, len(posInterjections), 0)
|
||||||
require.Equal(t, 0, len(negInterjections))
|
require.Equal(t, 0, len(negInterjections))
|
||||||
require.True(t, ok) // Only new buckets came in.
|
require.True(t, ok) // Only new buckets came in.
|
||||||
|
require.False(t, cr)
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // New histogram that has a bucket missing.
|
{ // New histogram that has a bucket missing.
|
||||||
|
@ -252,10 +254,11 @@ func TestHistoChunkAppendable(t *testing.T) {
|
||||||
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
|
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
|
||||||
|
|
||||||
histoApp, _ := app.(*HistoAppender)
|
histoApp, _ := app.(*HistoAppender)
|
||||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
|
||||||
require.Equal(t, 0, len(posInterjections))
|
require.Equal(t, 0, len(posInterjections))
|
||||||
require.Equal(t, 0, len(negInterjections))
|
require.Equal(t, 0, len(negInterjections))
|
||||||
require.False(t, ok) // Need to cut a new chunk.
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.False(t, cr)
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // New histogram that has a counter reset while buckets are same.
|
{ // New histogram that has a counter reset while buckets are same.
|
||||||
|
@ -264,10 +267,11 @@ func TestHistoChunkAppendable(t *testing.T) {
|
||||||
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
|
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
|
||||||
|
|
||||||
histoApp, _ := app.(*HistoAppender)
|
histoApp, _ := app.(*HistoAppender)
|
||||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
|
||||||
require.Equal(t, 0, len(posInterjections))
|
require.Equal(t, 0, len(posInterjections))
|
||||||
require.Equal(t, 0, len(negInterjections))
|
require.Equal(t, 0, len(negInterjections))
|
||||||
require.False(t, ok) // Need to cut a new chunk.
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.True(t, cr)
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // New histogram that has a counter reset while new buckets were added.
|
{ // New histogram that has a counter reset while new buckets were added.
|
||||||
|
@ -284,10 +288,11 @@ func TestHistoChunkAppendable(t *testing.T) {
|
||||||
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
|
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
|
||||||
|
|
||||||
histoApp, _ := app.(*HistoAppender)
|
histoApp, _ := app.(*HistoAppender)
|
||||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
|
||||||
require.Equal(t, 0, len(posInterjections))
|
require.Equal(t, 0, len(posInterjections))
|
||||||
require.Equal(t, 0, len(negInterjections))
|
require.Equal(t, 0, len(negInterjections))
|
||||||
require.False(t, ok) // Need to cut a new chunk.
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.True(t, cr)
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket.
|
{ // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket.
|
||||||
|
@ -307,9 +312,10 @@ func TestHistoChunkAppendable(t *testing.T) {
|
||||||
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
|
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
|
||||||
|
|
||||||
histoApp, _ := app.(*HistoAppender)
|
histoApp, _ := app.(*HistoAppender)
|
||||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
|
||||||
require.Equal(t, 0, len(posInterjections))
|
require.Equal(t, 0, len(posInterjections))
|
||||||
require.Equal(t, 0, len(negInterjections))
|
require.Equal(t, 0, len(negInterjections))
|
||||||
require.False(t, ok) // Need to cut a new chunk.
|
require.False(t, ok) // Need to cut a new chunk.
|
||||||
|
require.True(t, cr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,7 +150,7 @@ type xorAppender struct {
|
||||||
trailing uint8
|
trailing uint8
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram, counterReset bool) {
|
||||||
//panic("cannot call xorAppender.AppendHistogram().")
|
//panic("cannot call xorAppender.AppendHistogram().")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -605,15 +605,18 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
||||||
// appendHistogram adds the sparse histogram.
|
// appendHistogram adds the sparse histogram.
|
||||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||||
func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
|
func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
|
||||||
|
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards.
|
||||||
|
// We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk,
|
||||||
|
// we need to know if there was also a counter reset or not to set the meta properly.
|
||||||
|
app, _ := s.app.(*chunkenc.HistoAppender)
|
||||||
|
posInterjections, negInterjections, ok, counterReset := app.Appendable(sh)
|
||||||
|
|
||||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper)
|
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper)
|
||||||
if !sampleInOrder {
|
if !sampleInOrder {
|
||||||
return sampleInOrder, chunkCreated
|
return sampleInOrder, chunkCreated
|
||||||
}
|
}
|
||||||
|
|
||||||
if !chunkCreated {
|
if !chunkCreated {
|
||||||
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards
|
|
||||||
app, _ := s.app.(*chunkenc.HistoAppender)
|
|
||||||
posInterjections, negInterjections, ok := app.Appendable(sh)
|
|
||||||
// we have 3 cases here
|
// we have 3 cases here
|
||||||
// !ok -> we need to cut a new chunk
|
// !ok -> we need to cut a new chunk
|
||||||
// ok but we have interjections -> existing chunk needs recoding before we can append our histogram
|
// ok but we have interjections -> existing chunk needs recoding before we can append our histogram
|
||||||
|
@ -633,7 +636,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.app.AppendHistogram(t, sh)
|
s.app.AppendHistogram(t, sh, counterReset)
|
||||||
s.sparseHistogramSeries = true
|
s.sparseHistogramSeries = true
|
||||||
|
|
||||||
c.maxTime = t
|
c.maxTime = t
|
||||||
|
|
Loading…
Reference in a new issue