tsdb: Find union of two sets of histogram spans

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2023-01-04 15:35:31 +05:30
parent d7f5129042
commit 8ad0d2d5d7
No known key found for this signature in database
GPG key ID: F056451B52F1DC34
4 changed files with 175 additions and 18 deletions

View file

@ -292,6 +292,7 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) (
func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
positiveInterjections, negativeInterjections []Interjection, positiveInterjections, negativeInterjections []Interjection,
backwardPositiveInterjections, backwardNegativeInterjections []Interjection, backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
positiveSpans, negativeSpans []histogram.Span,
okToAppend bool, okToAppend bool,
) { ) {
if value.IsStaleNaN(h.Sum) { if value.IsStaleNaN(h.Sum) {
@ -309,8 +310,8 @@ func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) (
return return
} }
positiveInterjections, backwardPositiveInterjections = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
negativeInterjections, backwardNegativeInterjections = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
okToAppend = true okToAppend = true
return return
} }

View file

@ -390,7 +390,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2 := h1.Copy() h2 := h1.Copy()
h2.Schema++ h2.Schema++
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
_, _, _, _, ok := hApp.AppendableGauge(h2) _, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
require.False(t, ok) require.False(t, ok)
} }
@ -398,7 +398,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2 := h1.Copy() h2 := h1.Copy()
h2.ZeroThreshold += 0.1 h2.ZeroThreshold += 0.1
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
_, _, _, _, ok := hApp.AppendableGauge(h2) _, _, _, _, _, _, ok := hApp.AppendableGauge(h2)
require.False(t, ok) require.False(t, ok)
} }
@ -416,7 +416,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1}
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0) require.Greater(t, len(pI), 0)
require.Len(t, nI, 0) require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0) require.Len(t, pBackwardI, 0)
@ -438,7 +438,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.PositiveBuckets = []float64{6, 3, 3, 2, 5, 1} h2.PositiveBuckets = []float64{6, 3, 3, 2, 5, 1}
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Len(t, pI, 0) require.Len(t, pI, 0)
require.Len(t, nI, 0) require.Len(t, nI, 0)
require.Greater(t, len(pBackwardI), 0) require.Greater(t, len(pBackwardI), 0)
@ -458,7 +458,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1} h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0) require.Greater(t, len(pI), 0)
require.Greater(t, len(pBackwardI), 0) require.Greater(t, len(pBackwardI), 0)
require.Len(t, nI, 0) require.Len(t, nI, 0)
@ -472,7 +472,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1} h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Len(t, pI, 0) require.Len(t, pI, 0)
require.Len(t, nI, 0) require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0) require.Len(t, pBackwardI, 0)
@ -492,7 +492,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0} h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0}
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0) require.Greater(t, len(pI), 0)
require.Len(t, nI, 0) require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0) require.Len(t, pBackwardI, 0)
@ -516,7 +516,7 @@ func TestFloatHistogramChunkAppendableGauge(t *testing.T) {
h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1} h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1}
hApp, _ := app.(*FloatHistogramAppender) hApp, _ := app.(*FloatHistogramAppender)
pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) pI, nI, pBackwardI, nBackwardI, _, _, ok := hApp.AppendableGauge(h2)
require.Greater(t, len(pI), 0) require.Greater(t, len(pI), 0)
require.Len(t, nI, 0) require.Len(t, nI, 0)
require.Len(t, pBackwardI, 0) require.Len(t, pBackwardI, 0)

View file

@ -165,21 +165,23 @@ func (b *bucketIterator) Next() (int, bool) {
if b.span >= len(b.spans) { if b.span >= len(b.spans) {
return 0, false return 0, false
} }
try: if b.bucket < int(b.spans[b.span].Length)-1 { // Try to move within same span.
if b.bucket < int(b.spans[b.span].Length-1) { // Try to move within same span.
b.bucket++ b.bucket++
b.idx++ b.idx++
return b.idx, true return b.idx, true
} else if b.span < len(b.spans)-1 { // Try to move from one span to the next. }
for b.span < len(b.spans)-1 { // Try to move from one span to the next.
b.span++ b.span++
b.idx += int(b.spans[b.span].Offset + 1) b.idx += int(b.spans[b.span].Offset + 1)
b.bucket = 0 b.bucket = 0
if b.spans[b.span].Length == 0 { if b.spans[b.span].Length == 0 {
// Pathological case that should never happen. We can't use this span, let's try again. b.idx--
goto try continue
} }
return b.idx, true return b.idx, true
} }
// We're out of options. // We're out of options.
return 0, false return 0, false
} }
@ -280,11 +282,28 @@ loop:
// bidirectionalCompareSpans does everything that forwardCompareSpans does and // bidirectionalCompareSpans does everything that forwardCompareSpans does and
// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a). // also returns interjections in the other direction (i.e. buckets missing in b that are missing in a).
func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection) { func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection, mergedSpans []histogram.Span) {
ai := newBucketIterator(a) ai := newBucketIterator(a)
bi := newBucketIterator(b) bi := newBucketIterator(b)
var interjections, bInterjections []Interjection var interjections, bInterjections []Interjection
var lastBucket int
addBucket := func(b int) {
offset := b - lastBucket - 1
if offset == 0 && len(mergedSpans) > 0 {
mergedSpans[len(mergedSpans)-1].Length++
} else {
if len(mergedSpans) == 0 {
offset++
}
mergedSpans = append(mergedSpans, histogram.Span{
Offset: int32(offset),
Length: 1,
})
}
lastBucket = b
}
// When inter.num becomes > 0, this becomes a valid interjection that // When inter.num becomes > 0, this becomes a valid interjection that
// should be yielded when we finish a streak of new buckets. // should be yielded when we finish a streak of new buckets.
@ -307,6 +326,7 @@ loop:
bInterjections = append(bInterjections, bInter) bInterjections = append(bInterjections, bInter)
bInter.num = 0 bInter.num = 0
} }
addBucket(av)
av, aOK = ai.Next() av, aOK = ai.Next()
bv, bOK = bi.Next() bv, bOK = bi.Next()
inter.pos++ inter.pos++
@ -319,6 +339,7 @@ loop:
interjections = append(interjections, inter) interjections = append(interjections, inter)
inter.num = 0 inter.num = 0
} }
addBucket(av)
inter.pos++ inter.pos++
av, aOK = ai.Next() av, aOK = ai.Next()
case av > bv: // a misses a value that is in b. Forward b and recompare. case av > bv: // a misses a value that is in b. Forward b and recompare.
@ -329,14 +350,17 @@ loop:
bInterjections = append(bInterjections, bInter) bInterjections = append(bInterjections, bInter)
bInter.num = 0 bInter.num = 0
} }
addBucket(bv)
bInter.pos++ bInter.pos++
bv, bOK = bi.Next() bv, bOK = bi.Next()
} }
case aOK && !bOK: // b misses a value that is in a. case aOK && !bOK: // b misses a value that is in a.
bInter.num++ bInter.num++
addBucket(av)
av, aOK = ai.Next() av, aOK = ai.Next()
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
inter.num++ inter.num++
addBucket(bv)
bv, bOK = bi.Next() bv, bOK = bi.Next()
default: // Both iterators ran out. We're done. default: // Both iterators ran out. We're done.
if inter.num > 0 { if inter.num > 0 {
@ -349,7 +373,7 @@ loop:
} }
} }
return interjections, bInterjections return interjections, bInterjections, mergedSpans
} }
// interject merges 'in' with the provided interjections and writes them into // interject merges 'in' with the provided interjections and writes them into

View file

@ -330,7 +330,7 @@ func TestCompareSpansAndInterject(t *testing.T) {
for _, s := range scenarios { for _, s := range scenarios {
t.Run(s.description, func(t *testing.T) { t.Run(s.description, func(t *testing.T) {
if len(s.backwardInterjections) > 0 { if len(s.backwardInterjections) > 0 {
interjections, bInterjections := bidirectionalCompareSpans(s.spansA, s.spansB) interjections, bInterjections, _ := bidirectionalCompareSpans(s.spansA, s.spansB)
require.Equal(t, s.interjections, interjections) require.Equal(t, s.interjections, interjections)
require.Equal(t, s.backwardInterjections, bInterjections) require.Equal(t, s.backwardInterjections, bInterjections)
} }
@ -441,3 +441,135 @@ func TestWriteReadHistogramChunkLayout(t *testing.T) {
require.Equal(t, want.negativeSpans, gotNegativeSpans) require.Equal(t, want.negativeSpans, gotNegativeSpans)
} }
} }
func TestSpansFromBidirectionalCompareSpans(t *testing.T) {
cases := []struct {
s1, s2, exp []histogram.Span
}{
{ // All empty.
s1: []histogram.Span{},
s2: []histogram.Span{},
},
{ // Same spans.
s1: []histogram.Span{},
s2: []histogram.Span{},
},
{
// Has the cases of
// 1. |----| (partial overlap)
// |----|
//
// 2. |-----| (no gap but no overlap as well)
// |---|
//
// 3. |----| (complete overlap)
// |----|
s1: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 3, Length: 3},
{Offset: 5, Length: 3},
},
s2: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
{Offset: 2, Length: 3},
{Offset: 3, Length: 3},
},
exp: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 7},
{Offset: 3, Length: 3},
},
},
{
// s1 is superset of s2.
s1: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 3, Length: 5},
{Offset: 3, Length: 3},
},
s2: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 5, Length: 3},
{Offset: 4, Length: 3},
},
exp: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 3, Length: 5},
{Offset: 3, Length: 3},
},
},
{
// No overlaps but one span is side by side.
s1: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 3, Length: 3},
{Offset: 5, Length: 3},
},
s2: []histogram.Span{
{Offset: 3, Length: 3},
{Offset: 4, Length: 2},
},
exp: []histogram.Span{
{Offset: 0, Length: 9},
{Offset: 1, Length: 2},
{Offset: 2, Length: 3},
},
},
{
// No buckets in one of them.
s1: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 3, Length: 3},
{Offset: 5, Length: 3},
},
s2: []histogram.Span{},
exp: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 3, Length: 3},
{Offset: 5, Length: 3},
},
},
{ // Zero length spans.
s1: []histogram.Span{
{Offset: -5, Length: 0},
{Offset: 2, Length: 0},
{Offset: 3, Length: 3},
{Offset: 1, Length: 0},
{Offset: 2, Length: 3},
{Offset: 2, Length: 0},
{Offset: 2, Length: 0},
{Offset: 1, Length: 3},
{Offset: 4, Length: 0},
{Offset: 5, Length: 0},
},
s2: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
{Offset: 1, Length: 0},
{Offset: 1, Length: 3},
{Offset: 3, Length: 3},
},
exp: []histogram.Span{
{Offset: 0, Length: 3},
{Offset: 1, Length: 7},
{Offset: 3, Length: 3},
},
},
}
for _, c := range cases {
s1c := make([]histogram.Span, len(c.s1))
s2c := make([]histogram.Span, len(c.s2))
copy(s1c, c.s1)
copy(s2c, c.s2)
_, _, act := bidirectionalCompareSpans(c.s1, c.s2)
require.Equal(t, c.exp, act)
// Check that s1 and s2 are not modified.
require.Equal(t, s1c, c.s1)
require.Equal(t, s2c, c.s2)
_, _, act = bidirectionalCompareSpans(c.s2, c.s1)
require.Equal(t, c.exp, act)
}
}