mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-26 13:11:11 -08:00
Merge pull request #9216 from codesome/counterresets
Cut a new chunk on counter resets for any bucket
This commit is contained in:
commit
3636ff5b35
|
@ -246,6 +246,7 @@ func (a *HistoAppender) Append(int64, float64) {}
|
|||
// * the schema has changed
|
||||
// * the zerobucket threshold has changed
|
||||
// * any buckets disappeared
|
||||
// * there was a counter reset in the count of observations or in any bucket, including the zero bucket
|
||||
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) {
|
||||
if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold {
|
||||
return nil, nil, false
|
||||
|
@ -258,9 +259,89 @@ func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection,
|
|||
if !ok {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
if h.Count < a.cnt || h.ZeroCount < a.zcnt {
|
||||
// There has been a counter reset.
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) {
|
||||
return nil, nil, false
|
||||
}
|
||||
if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
return posInterjections, negInterjections, ok
|
||||
}
|
||||
|
||||
// counterResetInAnyBucket returns true if there was a counter reset for any bucket.
|
||||
// This should be called only when buckets are same or new buckets were added,
|
||||
// and does not handle the case of buckets missing.
|
||||
func counterResetInAnyBucket(oldBuckets, newBuckets []int64, oldSpans, newSpans []histogram.Span) bool {
|
||||
if len(oldSpans) == 0 || len(oldBuckets) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
oldSpanSliceIdx, newSpanSliceIdx := 0, 0 // Index for the span slices.
|
||||
oldInsideSpanIdx, newInsideSpanIdx := uint32(0), uint32(0) // Index inside a span.
|
||||
oldIdx, newIdx := oldSpans[0].Offset, newSpans[0].Offset
|
||||
|
||||
oldBucketSliceIdx, newBucketSliceIdx := 0, 0 // Index inside bucket slice.
|
||||
oldVal, newVal := oldBuckets[0], newBuckets[0]
|
||||
|
||||
// Since we assume that new spans won't have missing buckets, there will never be a case
|
||||
// where the old index will not find a matching new index.
|
||||
for {
|
||||
if oldIdx == newIdx {
|
||||
if newVal < oldVal {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if oldIdx <= newIdx {
|
||||
// Moving ahead old bucket and span by 1 index.
|
||||
if oldInsideSpanIdx == oldSpans[oldSpanSliceIdx].Length-1 {
|
||||
// Current span is over.
|
||||
oldSpanSliceIdx++
|
||||
oldInsideSpanIdx = 0
|
||||
if oldSpanSliceIdx >= len(oldSpans) {
|
||||
// All old spans are over.
|
||||
break
|
||||
}
|
||||
oldIdx += 1 + oldSpans[oldSpanSliceIdx].Offset
|
||||
} else {
|
||||
oldInsideSpanIdx++
|
||||
oldIdx++
|
||||
}
|
||||
oldBucketSliceIdx++
|
||||
oldVal += oldBuckets[oldBucketSliceIdx]
|
||||
}
|
||||
|
||||
if oldIdx > newIdx {
|
||||
// Moving ahead new bucket and span by 1 index.
|
||||
if newInsideSpanIdx == newSpans[newSpanSliceIdx].Length-1 {
|
||||
// Current span is over.
|
||||
newSpanSliceIdx++
|
||||
newInsideSpanIdx = 0
|
||||
if newSpanSliceIdx >= len(newSpans) {
|
||||
// All new spans are over.
|
||||
// This should not happen, old spans above should catch this first.
|
||||
panic("new spans over before old spans in counterReset")
|
||||
}
|
||||
newIdx += 1 + newSpans[newSpanSliceIdx].Offset
|
||||
} else {
|
||||
newInsideSpanIdx++
|
||||
newIdx++
|
||||
}
|
||||
newBucketSliceIdx++
|
||||
newVal += newBuckets[newBucketSliceIdx]
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// AppendHistogram appends a SparseHistogram to the chunk. We assume the
|
||||
// histogram is properly structured. E.g. that the number of pos/neg buckets
|
||||
// used corresponds to the number conveyed by the pos/neg span structures.
|
||||
|
|
|
@ -193,3 +193,129 @@ func TestHistoChunkBucketChanges(t *testing.T) {
|
|||
require.NoError(t, it.Err())
|
||||
require.Equal(t, exp, act)
|
||||
}
|
||||
|
||||
func TestHistoChunkAppendable(t *testing.T) {
|
||||
c := Chunk(NewHistoChunk())
|
||||
|
||||
// Create fresh appender and add the first histogram.
|
||||
app, err := c.Appender()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, c.NumSamples())
|
||||
|
||||
ts := int64(1234567890)
|
||||
h1 := histogram.SparseHistogram{
|
||||
Count: 5,
|
||||
ZeroCount: 2,
|
||||
Sum: 18.4,
|
||||
ZeroThreshold: 1e-125,
|
||||
Schema: 1,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
{Offset: 3, Length: 2},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
},
|
||||
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
|
||||
NegativeSpans: nil,
|
||||
NegativeBuckets: []int64{},
|
||||
}
|
||||
|
||||
app.AppendHistogram(ts, h1)
|
||||
require.Equal(t, 1, c.NumSamples())
|
||||
|
||||
{ // New histogram that has more buckets.
|
||||
h2 := h1
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
}
|
||||
h2.Count += 9
|
||||
h2.ZeroCount++
|
||||
h2.Sum = 30
|
||||
// Existing histogram should get values converted from the above to: 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
|
||||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
|
||||
|
||||
histoApp, _ := app.(*HistoAppender)
|
||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
||||
require.Greater(t, len(posInterjections), 0)
|
||||
require.Equal(t, 0, len(negInterjections))
|
||||
require.True(t, ok) // Only new buckets came in.
|
||||
}
|
||||
|
||||
{ // New histogram that has a bucket missing.
|
||||
h2 := h1
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 5, Length: 2},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
}
|
||||
h2.Sum = 21
|
||||
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
|
||||
|
||||
histoApp, _ := app.(*HistoAppender)
|
||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
||||
require.Equal(t, 0, len(posInterjections))
|
||||
require.Equal(t, 0, len(negInterjections))
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while buckets are same.
|
||||
h2 := h1
|
||||
h2.Sum = 23
|
||||
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
|
||||
|
||||
histoApp, _ := app.(*HistoAppender)
|
||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
||||
require.Equal(t, 0, len(posInterjections))
|
||||
require.Equal(t, 0, len(negInterjections))
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while new buckets were added.
|
||||
h2 := h1
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: 0, Length: 3},
|
||||
{Offset: 1, Length: 1},
|
||||
{Offset: 1, Length: 4},
|
||||
{Offset: 3, Length: 3},
|
||||
}
|
||||
h2.Sum = 29
|
||||
// Existing histogram should get values converted from the above to: 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
|
||||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
|
||||
|
||||
histoApp, _ := app.(*HistoAppender)
|
||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
||||
require.Equal(t, 0, len(posInterjections))
|
||||
require.Equal(t, 0, len(negInterjections))
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
}
|
||||
|
||||
{ // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket.
|
||||
// (to catch the edge case where the new bucket should be forwarded ahead until first old bucket at start)
|
||||
h2 := h1
|
||||
h2.PositiveSpans = []histogram.Span{
|
||||
{Offset: -3, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
{Offset: 3, Length: 2},
|
||||
{Offset: 3, Length: 1},
|
||||
{Offset: 1, Length: 1},
|
||||
}
|
||||
h2.Sum = 26
|
||||
// Existing histogram should get values converted from the above to: 0, 0, 6, 3, 3, 2, 4, 5, 1
|
||||
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
|
||||
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
|
||||
|
||||
histoApp, _ := app.(*HistoAppender)
|
||||
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
|
||||
require.Equal(t, 0, len(posInterjections))
|
||||
require.Equal(t, 0, len(negInterjections))
|
||||
require.False(t, ok) // Need to cut a new chunk.
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue