Merge pull request #9055 from prometheus/beorn7/histogram

Fix interjections at the end
This commit is contained in:
Björn Rabenstein 2021-07-06 00:20:02 +02:00 committed by GitHub
commit 6b682541f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 215 additions and 67 deletions

View file

@ -387,7 +387,6 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
} }
app.AppendHistogram(tOld, hOld) app.AppendHistogram(tOld, hOld)
} }
return hc, app return hc, app
} }

View file

@ -159,56 +159,54 @@ func compareSpans(a, b []histogram.Span) ([]Interjection, bool) {
av, aok := ai.Next() av, aok := ai.Next()
bv, bok := bi.Next() bv, bok := bi.Next()
loop:
for { for {
if aok && bok { switch {
if av == bv { // both have an identical value. move on! case aok && bok:
// finish WIP interjection and reset switch {
case av == bv: // Both have an identical value. move on!
// Finish WIP interjection and reset.
if inter.num > 0 { if inter.num > 0 {
interjections = append(interjections, inter) interjections = append(interjections, inter)
} }
inter.num = 0 inter.num = 0
av, aok = ai.Next() av, aok = ai.Next()
bv, bok = bi.Next() bv, bok = bi.Next()
if aok {
inter.pos++ inter.pos++
} case av < bv: // b misses a value that is in a.
continue
}
if av < bv { // b misses a value that is in a.
return interjections, false return interjections, false
} case av > bv: // a misses a value that is in b. Forward b and recompare.
if av > bv { // a misses a value that is in b. forward b and recompare
inter.num++ inter.num++
bv, bok = bi.Next() bv, bok = bi.Next()
continue
} }
} else if aok && !bok { // b misses a value that is in a. case aok && !bok: // b misses a value that is in a.
return interjections, false return interjections, false
} else if !aok && bok { // a misses a value that is in b. forward b and recompare case !aok && bok: // a misses a value that is in b. Forward b and recompare.
inter.num++ inter.num++
bv, bok = bi.Next() bv, bok = bi.Next()
continue default: // Both iterators ran out. We're done.
} else { // both iterators ran out. we're done
if inter.num > 0 { if inter.num > 0 {
interjections = append(interjections, inter) interjections = append(interjections, inter)
} }
break break loop
} }
} }
return interjections, true return interjections, true
} }
// caller is responsible for making sure len(in) and len(out) are appropriate for the provided interjections! // interject merges 'in' with the provided interjections and writes them into
// 'out', which must already have the appropriate length.
func interject(in, out []int64, interjections []Interjection) []int64 { func interject(in, out []int64, interjections []Interjection) []int64 {
var j int // position in out var j int // Position in out.
var v int64 // the last value seen var v int64 // The last value seen.
var interj int // the next interjection to process var interj int // The next interjection to process.
for i, d := range in { for i, d := range in {
if interj < len(interjections) && i == interjections[interj].pos { if interj < len(interjections) && i == interjections[interj].pos {
// we have an interjection! // We have an interjection!
// add interjection.num new delta values such as their bucket values equate 0 // Add interjection.num new delta values such that their
// bucket values equate 0.
out[j] = int64(-v) out[j] = int64(-v)
j++ j++
for x := 1; x < interjections[interj].num; x++ { for x := 1; x < interjections[interj].num; x++ {
@ -217,19 +215,35 @@ func interject(in, out []int64, interjections []Interjection) []int64 {
} }
interj++ interj++
// now save the value from the input. the delta value we should save is // Now save the value from the input. The delta value we
// the original delta value + the last value of the point before the interjection (to undo the delta that was introduced by the interjection) // should save is the original delta value + the last
// value of the point before the interjection (to undo
// the delta that was introduced by the interjection).
out[j] = d + v out[j] = d + v
j++ j++
v = d + v v = d + v
continue continue
} }
// if there was no interjection, the original delta is still valid // If there was no interjection, the original delta is still
// valid.
out[j] = d out[j] = d
j++ j++
v += d v += d
} }
switch interj {
case len(interjections):
// All interjections processed. Nothing more to do.
case len(interjections) - 1:
// One more interjection to process at the end.
out[j] = int64(-v)
j++
for x := 1; x < interjections[interj].num; x++ {
out[j] = 0
j++
}
default:
panic("unprocessed interjections left")
}
return out return out
} }

View file

@ -108,21 +108,155 @@ func TestBucketIterator(t *testing.T) {
} }
func TestInterjection(t *testing.T) { func TestInterjection(t *testing.T) {
// this tests the scenario as described in compareSpans's comments scenarios := []struct {
a := []histogram.Span{ description string
spansA, spansB []histogram.Span
valid bool
interjections []Interjection
bucketsIn, bucketsOut []int64
}{
{
description: "single prepend at the beginning",
spansA: []histogram.Span{
{Offset: -10, Length: 3},
},
spansB: []histogram.Span{
{Offset: -11, Length: 4},
},
valid: true,
interjections: []Interjection{
{
pos: 0,
num: 1,
},
},
bucketsIn: []int64{6, -3, 0},
bucketsOut: []int64{0, 6, -3, 0},
},
{
description: "single append at the end",
spansA: []histogram.Span{
{Offset: -10, Length: 3},
},
spansB: []histogram.Span{
{Offset: -10, Length: 4},
},
valid: true,
interjections: []Interjection{
{
pos: 3,
num: 1,
},
},
bucketsIn: []int64{6, -3, 0},
bucketsOut: []int64{6, -3, 0, -3},
},
{
description: "double prepend at the beginning",
spansA: []histogram.Span{
{Offset: -10, Length: 3},
},
spansB: []histogram.Span{
{Offset: -12, Length: 5},
},
valid: true,
interjections: []Interjection{
{
pos: 0,
num: 2,
},
},
bucketsIn: []int64{6, -3, 0},
bucketsOut: []int64{0, 0, 6, -3, 0},
},
{
description: "double append at the end",
spansA: []histogram.Span{
{Offset: -10, Length: 3},
},
spansB: []histogram.Span{
{Offset: -10, Length: 5},
},
valid: true,
interjections: []Interjection{
{
pos: 3,
num: 2,
},
},
bucketsIn: []int64{6, -3, 0},
bucketsOut: []int64{6, -3, 0, -3, 0},
},
{
description: "double prepond at the beginning and double append at the end",
spansA: []histogram.Span{
{Offset: -10, Length: 3},
},
spansB: []histogram.Span{
{Offset: -12, Length: 7},
},
valid: true,
interjections: []Interjection{
{
pos: 0,
num: 2,
},
{
pos: 3,
num: 2,
},
},
bucketsIn: []int64{6, -3, 0},
bucketsOut: []int64{0, 0, 6, -3, 0, -3, 0},
},
{
description: "single removal of bucket at the start",
spansA: []histogram.Span{
{Offset: -10, Length: 4},
},
spansB: []histogram.Span{
{Offset: -9, Length: 3},
},
valid: false,
},
{
description: "single removal of bucket in the middle",
spansA: []histogram.Span{
{Offset: -10, Length: 4},
},
spansB: []histogram.Span{
{Offset: -10, Length: 2},
{Offset: 1, Length: 1},
},
valid: false,
},
{
description: "single removal of bucket at the end",
spansA: []histogram.Span{
{Offset: -10, Length: 4},
},
spansB: []histogram.Span{
{Offset: -10, Length: 3},
},
valid: false,
},
{
description: "as described in doc comment",
spansA: []histogram.Span{
{Offset: 0, Length: 2}, {Offset: 0, Length: 2},
{Offset: 2, Length: 1}, {Offset: 2, Length: 1},
{Offset: 3, Length: 2}, {Offset: 3, Length: 2},
{Offset: 3, Length: 1}, {Offset: 3, Length: 1},
{Offset: 1, Length: 1}, {Offset: 1, Length: 1},
} },
b := []histogram.Span{ spansB: []histogram.Span{
{Offset: 0, Length: 3}, {Offset: 0, Length: 3},
{Offset: 1, Length: 1}, {Offset: 1, Length: 1},
{Offset: 1, Length: 4}, {Offset: 1, Length: 4},
{Offset: 3, Length: 3}, {Offset: 3, Length: 3},
} },
interj := []Interjection{ valid: true,
interjections: []Interjection{
{ {
pos: 2, pos: 2,
num: 1, num: 1,
@ -135,25 +269,26 @@ func TestInterjection(t *testing.T) {
pos: 6, pos: 6,
num: 1, num: 1,
}, },
} },
testCompareSpans(a, b, interj, t) bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4},
testInterject(interj, t) bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1},
},
} }
func testCompareSpans(a, b []histogram.Span, exp []Interjection, t *testing.T) { for _, s := range scenarios {
got, ok := compareSpans(a, b) t.Run(s.description, func(t *testing.T) {
require.Equal(t, true, ok) interjections, valid := compareSpans(s.spansA, s.spansB)
require.Equal(t, exp, got) if !s.valid {
require.False(t, valid, "compareScan unexpectedly returned true")
return
} }
require.True(t, valid, "compareScan unexpectedly returned false")
require.Equal(t, s.interjections, interjections)
func testInterject(interjections []Interjection, t *testing.T) { gotBuckets := make([]int64, len(s.bucketsOut))
// this tests the scenario as described in compareSpans's comments interject(s.bucketsIn, gotBuckets, interjections)
// original deltas that represent these counts : 6, 3, 3, 2, 4, 5, 1 require.Equal(t, s.bucketsOut, gotBuckets)
a := []int64{6, -3, 0, -1, 2, 1, -4}
// modified deltas to represent the interjected counts: 6, 3, 0, 3, 0, 0, 2, 4, 5, 0, 1
exp := []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}
b := make([]int64, len(a)+4)
interject(a, b, interjections)
require.Equal(t, exp, b)
})
}
} }

View file

@ -16,7 +16,6 @@ package tsdb
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/prometheus/prometheus/pkg/histogram"
"math" "math"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -32,6 +31,7 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"