From dc1c7441694a73947eeae688226ab611bde7a338 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 5 Jul 2021 23:01:39 +0200 Subject: [PATCH 1/2] Fix interjections at the end Signed-off-by: beorn7 --- tsdb/chunkenc/histo_meta.go | 39 +++++-- tsdb/chunkenc/histo_meta_test.go | 190 ++++++++++++++++++++++++------- 2 files changed, 181 insertions(+), 48 deletions(-) diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index 17bbef582..91506743b 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -186,6 +186,7 @@ func compareSpans(a, b []histogram.Span) ([]Interjection, bool) { return interjections, false } else if !aok && bok { // a misses a value that is in b. forward b and recompare inter.num++ + inter.pos++ bv, bok = bi.Next() continue } else { // both iterators ran out. we're done @@ -199,16 +200,18 @@ func compareSpans(a, b []histogram.Span) ([]Interjection, bool) { return interjections, true } -// caller is responsible for making sure len(in) and len(out) are appropriate for the provided interjections! +// interject merges 'in' with the provided interjections and writes them into +// 'out', which must already have the appropriate length. func interject(in, out []int64, interjections []Interjection) []int64 { - var j int // position in out - var v int64 // the last value seen - var interj int // the next interjection to process + var j int // Position in out. + var v int64 // The last value seen. + var interj int // The next interjection to process. for i, d := range in { if interj < len(interjections) && i == interjections[interj].pos { - // we have an interjection! - // add interjection.num new delta values such as their bucket values equate 0 + // We have an interjection! + // Add interjection.num new delta values such that their + // bucket values equate 0. out[j] = int64(-v) j++ for x := 1; x < interjections[interj].num; x++ { @@ -217,19 +220,35 @@ func interject(in, out []int64, interjections []Interjection) []int64 { } interj++ - // now save the value from the input. the delta value we should save is - // the original delta value + the last value of the point before the interjection (to undo the delta that was introduced by the interjection) + // Now save the value from the input. The delta value we + // should save is the original delta value + the last + // value of the point before the interjection (to undo + // the delta that was introduced by the interjection). out[j] = d + v j++ v = d + v continue } - // if there was no interjection, the original delta is still valid + // If there was no interjection, the original delta is still + // valid. out[j] = d j++ v += d } - + switch interj { + case len(interjections): + // All interjections processed. Nothing more to do. + case len(interjections) - 1: + // One more interjection to process at the end. + out[j] = int64(-v) + j++ + for x := 1; x < interjections[interj].num; x++ { + out[j] = 0 + j++ + } + default: + panic("unprocessed interjections left") + } return out } diff --git a/tsdb/chunkenc/histo_meta_test.go b/tsdb/chunkenc/histo_meta_test.go index 3f830975b..657c783b9 100644 --- a/tsdb/chunkenc/histo_meta_test.go +++ b/tsdb/chunkenc/histo_meta_test.go @@ -108,52 +108,166 @@ func TestBucketIterator(t *testing.T) { } func TestInterjection(t *testing.T) { - // this tests the scenario as described in compareSpans's comments - a := []histogram.Span{ - {Offset: 0, Length: 2}, - {Offset: 2, Length: 1}, - {Offset: 3, Length: 2}, - {Offset: 3, Length: 1}, - {Offset: 1, Length: 1}, - } - b := []histogram.Span{ - {Offset: 0, Length: 3}, - {Offset: 1, Length: 1}, - {Offset: 1, Length: 4}, - {Offset: 3, Length: 3}, - } - interj := []Interjection{ + scenarios := []struct { + description string + spansA, spansB []histogram.Span + valid bool + interjections []Interjection + bucketsIn, bucketsOut []int64 + }{ { - pos: 2, - num: 1, + description: "single prepend at the beginning", + spansA: []histogram.Span{ + {Offset: -10, Length: 3}, + }, + spansB: []histogram.Span{ + {Offset: -11, Length: 4}, + }, + valid: true, + interjections: []Interjection{ + { + pos: 0, + num: 1, + }, + }, + bucketsIn: []int64{6, -3, 0}, + bucketsOut: []int64{0, 6, -3, 0}, }, { - pos: 3, - num: 2, + description: "single append at the end", + spansA: []histogram.Span{ + {Offset: -10, Length: 3}, + }, + spansB: []histogram.Span{ + {Offset: -10, Length: 4}, + }, + valid: true, + interjections: []Interjection{ + { + pos: 3, + num: 1, + }, + }, + bucketsIn: []int64{6, -3, 0}, + bucketsOut: []int64{6, -3, 0, -3}, }, { - pos: 6, - num: 1, + description: "double prepend at the beginning", + spansA: []histogram.Span{ + {Offset: -10, Length: 3}, + }, + spansB: []histogram.Span{ + {Offset: -12, Length: 5}, + }, + valid: true, + interjections: []Interjection{ + { + pos: 0, + num: 2, + }, + }, + bucketsIn: []int64{6, -3, 0}, + bucketsOut: []int64{0, 0, 6, -3, 0}, + }, + { + description: "double append at the end", + spansA: []histogram.Span{ + {Offset: -10, Length: 3}, + }, + spansB: []histogram.Span{ + {Offset: -10, Length: 5}, + }, + valid: true, + interjections: []Interjection{ + { + pos: 3, + num: 2, + }, + }, + bucketsIn: []int64{6, -3, 0}, + bucketsOut: []int64{6, -3, 0, -3, 0}, + }, + { + description: "single removal of bucket at the start", + spansA: []histogram.Span{ + {Offset: -10, Length: 4}, + }, + spansB: []histogram.Span{ + {Offset: -9, Length: 3}, + }, + valid: false, + }, + { + description: "single removal of bucket in the middle", + spansA: []histogram.Span{ + {Offset: -10, Length: 4}, + }, + spansB: []histogram.Span{ + {Offset: -10, Length: 2}, + {Offset: 1, Length: 1}, + }, + valid: false, + }, + { + description: "single removal of bucket at the end", + spansA: []histogram.Span{ + {Offset: -10, Length: 4}, + }, + spansB: []histogram.Span{ + {Offset: -10, Length: 3}, + }, + valid: false, + }, + // TODO(beorn7): Add more scenarios. + { + description: "as described in doc comment", + spansA: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + spansB: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + }, + valid: true, + interjections: []Interjection{ + { + pos: 2, + num: 1, + }, + { + pos: 3, + num: 2, + }, + { + pos: 6, + num: 1, + }, + }, + bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4}, + bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}, }, } - testCompareSpans(a, b, interj, t) - testInterject(interj, t) -} -func testCompareSpans(a, b []histogram.Span, exp []Interjection, t *testing.T) { - got, ok := compareSpans(a, b) - require.Equal(t, true, ok) - require.Equal(t, exp, got) -} + for _, s := range scenarios { + t.Run(s.description, func(t *testing.T) { + interjections, valid := compareSpans(s.spansA, s.spansB) + if !s.valid { + require.False(t, valid, "compareScan unexpectedly returned true") + return + } + require.True(t, valid, "compareScan unexpectedly returned false") + require.Equal(t, s.interjections, interjections) -func testInterject(interjections []Interjection, t *testing.T) { - // this tests the scenario as described in compareSpans's comments - // original deltas that represent these counts : 6, 3, 3, 2, 4, 5, 1 - a := []int64{6, -3, 0, -1, 2, 1, -4} - // modified deltas to represent the interjected counts: 6, 3, 0, 3, 0, 0, 2, 4, 5, 0, 1 - exp := []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} - b := make([]int64, len(a)+4) - interject(a, b, interjections) - require.Equal(t, exp, b) + gotBuckets := make([]int64, len(s.bucketsOut)) + interject(s.bucketsIn, gotBuckets, interjections) + require.Equal(t, s.bucketsOut, gotBuckets) + }) + } } From 01957eee2b408327ee91ec7a06c101c6132cc7ca Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 5 Jul 2021 23:59:33 +0200 Subject: [PATCH 2/2] Fix interjections even more Signed-off-by: beorn7 --- tsdb/chunkenc/histo.go | 1 - tsdb/chunkenc/histo_meta.go | 31 +++++++++++++------------------ tsdb/chunkenc/histo_meta_test.go | 23 ++++++++++++++++++++++- tsdb/head.go | 2 +- 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 780ed0597..f588278f0 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -387,7 +387,6 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection } app.AppendHistogram(tOld, hOld) } - return hc, app } diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index 91506743b..3010cfa39 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -159,41 +159,36 @@ func compareSpans(a, b []histogram.Span) ([]Interjection, bool) { av, aok := ai.Next() bv, bok := bi.Next() +loop: for { - if aok && bok { - if av == bv { // both have an identical value. move on! - // finish WIP interjection and reset + switch { + case aok && bok: + switch { + case av == bv: // Both have an identical value. move on! + // Finish WIP interjection and reset. if inter.num > 0 { interjections = append(interjections, inter) } inter.num = 0 av, aok = ai.Next() bv, bok = bi.Next() - if aok { - inter.pos++ - } - continue - } - if av < bv { // b misses a value that is in a. + inter.pos++ + case av < bv: // b misses a value that is in a. return interjections, false - } - if av > bv { // a misses a value that is in b. forward b and recompare + case av > bv: // a misses a value that is in b. Forward b and recompare. inter.num++ bv, bok = bi.Next() - continue } - } else if aok && !bok { // b misses a value that is in a. + case aok && !bok: // b misses a value that is in a. return interjections, false - } else if !aok && bok { // a misses a value that is in b. forward b and recompare + case !aok && bok: // a misses a value that is in b. Forward b and recompare. inter.num++ - inter.pos++ bv, bok = bi.Next() - continue - } else { // both iterators ran out. we're done + default: // Both iterators ran out. We're done. if inter.num > 0 { interjections = append(interjections, inter) } - break + break loop } } diff --git a/tsdb/chunkenc/histo_meta_test.go b/tsdb/chunkenc/histo_meta_test.go index 657c783b9..a8339ca2f 100644 --- a/tsdb/chunkenc/histo_meta_test.go +++ b/tsdb/chunkenc/histo_meta_test.go @@ -187,6 +187,28 @@ func TestInterjection(t *testing.T) { bucketsIn: []int64{6, -3, 0}, bucketsOut: []int64{6, -3, 0, -3, 0}, }, + { + description: "double prepond at the beginning and double append at the end", + spansA: []histogram.Span{ + {Offset: -10, Length: 3}, + }, + spansB: []histogram.Span{ + {Offset: -12, Length: 7}, + }, + valid: true, + interjections: []Interjection{ + { + pos: 0, + num: 2, + }, + { + pos: 3, + num: 2, + }, + }, + bucketsIn: []int64{6, -3, 0}, + bucketsOut: []int64{0, 0, 6, -3, 0, -3, 0}, + }, { description: "single removal of bucket at the start", spansA: []histogram.Span{ @@ -218,7 +240,6 @@ func TestInterjection(t *testing.T) { }, valid: false, }, - // TODO(beorn7): Add more scenarios. { description: "as described in doc comment", spansA: []histogram.Span{ diff --git a/tsdb/head.go b/tsdb/head.go index 940d6af83..1475635e7 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -16,7 +16,6 @@ package tsdb import ( "context" "fmt" - "github.com/prometheus/prometheus/pkg/histogram" "math" "path/filepath" "runtime" @@ -32,6 +31,7 @@ import ( "go.uber.org/atomic" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc"