Fix re-code histogram and chunk re-code conflict

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-08-06 13:08:10 +02:00
parent aff089a014
commit 1b6d1366d8
5 changed files with 120 additions and 16 deletions

View file

@ -419,6 +419,7 @@ loop:
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++ // Mark that we need to insert a bucket in b.
bInter.bucketIdx = aIdx
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
@ -436,6 +437,7 @@ loop:
return nil, nil, false
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
aInter.num++
bInter.bucketIdx = bIdx
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
@ -453,6 +455,7 @@ loop:
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++
bInter.bucketIdx = aIdx
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
@ -471,6 +474,7 @@ loop:
return nil, nil, false
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
aInter.num++
bInter.bucketIdx = bIdx
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
@ -773,6 +777,22 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
happ.appendFloatHistogram(t, h)
return newChunk, false, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
// The histogram needs to be expanded to have the extra empty buckets
// of the chunk.
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
h.PositiveSpans = resize(h.PositiveSpans, len(a.pSpans))
copy(h.PositiveSpans, a.pSpans)
h.NegativeSpans = resize(h.NegativeSpans, len(a.nSpans))
copy(h.NegativeSpans, a.nSpans)
} else {
// Spans need pre-adjusting to accommodate the new buckets.
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
}
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
@ -784,13 +804,6 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
return chk, true, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
// The histogram needs to be expanded to have the extra empty buckets
// of the chunk.
h.PositiveSpans = a.pSpans
h.NegativeSpans = a.nSpans
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
a.appendFloatHistogram(t, h)
return nil, false, a, nil
}

View file

@ -453,7 +453,13 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
// Check that h2 was recoded.
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2, 3}, h2.PositiveBuckets)
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
require.Equal(t, []histogram.Span{
{Offset: 0, Length: 2}, // Added empty bucket.
{Offset: 2, Length: 1}, // Existing - offset adjusted.
{Offset: 3, Length: 2}, // Existing.
{Offset: 3, Length: 1}, // Added empty bucket.
{Offset: 1, Length: 2}, // Existing + the extra bucket.
}, h2.PositiveSpans)
}
{ // New histogram that has a counter reset while buckets are same.

View file

@ -437,6 +437,7 @@ loop:
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++ // Mark that we need to insert a bucket in b.
bInter.bucketIdx = aIdx
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
@ -454,6 +455,7 @@ loop:
return nil, nil, false
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
aInter.num++
aInter.bucketIdx = bIdx
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
@ -471,6 +473,7 @@ loop:
// fill in the bucket in b and advance a.
if aCount == 0 {
bInter.num++
bInter.bucketIdx = aIdx
// Advance a
if aInter.num > 0 {
aInserts = append(aInserts, aInter)
@ -489,6 +492,7 @@ loop:
return nil, nil, false
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
aInter.num++
aInter.bucketIdx = bIdx
// Advance b
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
@ -807,6 +811,22 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
happ.appendHistogram(t, h)
return newChunk, false, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
// The histogram needs to be expanded to have the extra empty buckets
// of the chunk.
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
h.PositiveSpans = resize(h.PositiveSpans, len(a.pSpans))
copy(h.PositiveSpans, a.pSpans)
h.NegativeSpans = resize(h.NegativeSpans, len(a.nSpans))
copy(h.NegativeSpans, a.nSpans)
} else {
// Spans need pre-adjusting to accommodate the new buckets.
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
}
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
if appendOnly {
return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
@ -818,13 +838,6 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
app.(*HistogramAppender).appendHistogram(t, h)
return chk, true, app, nil
}
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
// The histogram needs to be expanded to have the extra empty buckets
// of the chunk.
h.PositiveSpans = a.pSpans
h.NegativeSpans = a.nSpans
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
a.appendHistogram(t, h)
return nil, false, a, nil
}

View file

@ -278,6 +278,10 @@ func (b *bucketIterator) Next() (int, bool) {
type Insert struct {
pos int
num int
// Optional: bucketIdx is the index of the bucket that is inserted.
// Can be used to adjust spans.
bucketIdx int
}
// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or
@ -577,3 +581,65 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR
return histogram.UnknownCounterReset
}
}
// adjustForInserts adjusts the spans for the given inserts.
func adjustForInserts(spans []histogram.Span, inserts []Insert) (mergedSpans []histogram.Span) {
if len(inserts) == 0 {
return spans
}
it := newBucketIterator(spans)
var (
lastBucket int
i int
insertIdx int = inserts[i].bucketIdx
insertNum int = inserts[i].num
)
addBucket := func(b int) {
offset := b - lastBucket - 1
if offset == 0 && len(mergedSpans) > 0 {
mergedSpans[len(mergedSpans)-1].Length++
} else {
if len(mergedSpans) == 0 {
offset++
}
mergedSpans = append(mergedSpans, histogram.Span{
Offset: int32(offset),
Length: 1,
})
}
lastBucket = b
}
consumeInsert := func() {
// Consume the insert.
insertNum--
if insertNum == 0 {
i++
if i < len(inserts) {
insertIdx = inserts[i].bucketIdx
insertNum = inserts[i].num
}
} else {
insertIdx++
}
}
bucket, ok := it.Next()
for ok {
if i < len(inserts) && insertIdx < bucket {
addBucket(insertIdx)
consumeInsert()
} else {
addBucket(bucket)
bucket, ok = it.Next()
}
}
for i < len(inserts) {
addBucket(inserts[i].bucketIdx)
consumeInsert()
}
return
}

View file

@ -471,7 +471,13 @@ func TestHistogramChunkAppendable(t *testing.T) {
// Check that h2 was recoded.
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 5 (total 23)
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
require.Equal(t, []histogram.Span{
{Offset: 0, Length: 2}, // Added empty bucket.
{Offset: 2, Length: 1}, // Existing - offset adjusted.
{Offset: 3, Length: 2}, // Added empty bucket.
{Offset: 3, Length: 1}, // Existing - offset adjusted.
{Offset: 1, Length: 2}, // Existing.
}, h2.PositiveSpans)
}
{ // New histogram that has a counter reset while buckets are same.