Add unit test for histogram append and various querying scenarios (#11194)

* Add unit test for histogram append and various querying scenarios

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* make lint happy

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix tests

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix review comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2022-08-29 15:35:03 +05:30 committed by GitHub
parent f7df3b86ba
commit d209a29a5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 374 additions and 15 deletions

View file

@ -29,6 +29,7 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -515,8 +516,67 @@ const (
defaultLabelValue = "labelValue"
)
// genSeries generates series with a given number of labels and values.
// genSeries generates series of float64 samples with a given number of labels and values.
func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series {
return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, 1, func(ts int64) tsdbutil.Sample {
return sample{t: ts, v: rand.Float64()}
})
}
// genHistogramSeries generates series of histogram samples with a given number of labels and values.
func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series {
return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample {
h := &histogram.Histogram{
Count: 5 + uint64(ts*4),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
}
return sample{t: ts, h: h}
})
}
// genHistogramAndFloatSeries generates series of mixed histogram and float64 samples with a given number of labels and values.
func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step int64) []storage.Series {
floatSample := false
count := 0
return genSeriesFromSampleGenerator(totalSeries, labelCount, mint, maxt, step, func(ts int64) tsdbutil.Sample {
count++
var s sample
if floatSample {
s = sample{t: ts, v: rand.Float64()}
} else {
h := &histogram.Histogram{
Count: 5 + uint64(ts*4),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
}
s = sample{t: ts, h: h}
}
if count%5 == 0 {
// Flip the sample type for every 5 samples.
floatSample = !floatSample
}
return s
})
}
func genSeriesFromSampleGenerator(totalSeries, labelCount int, mint, maxt, step int64, generator func(ts int64) tsdbutil.Sample) []storage.Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}
@ -530,8 +590,8 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t < maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
for t := mint; t < maxt; t += step {
samples = append(samples, generator(t))
}
series[i] = storage.NewListSeries(labels.FromMap(lbls), samples)
}

View file

@ -40,6 +40,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
@ -92,10 +93,17 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
samples := []tsdbutil.Sample{}
it := series.Iterator()
for it.Next() == chunkenc.ValFloat {
// TODO(beorn7): Also handle histograms.
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {
case chunkenc.ValFloat:
ts, v := it.At()
samples = append(samples, sample{t: ts, v: v})
case chunkenc.ValHistogram:
ts, h := it.AtHistogram()
samples = append(samples, sample{t: ts, h: h})
default:
t.Fatalf("unknown sample type in query %s", typ.String())
}
}
require.NoError(t, it.Err())
@ -3742,3 +3750,276 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
require.Equal(t, reopenDB.head.series.getByHash(s3.Hash(), s3).meta, m3)
require.Equal(t, reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4)
}
func TestHistogramAppendAndQuery(t *testing.T) {
db := openTestDB(t, nil, nil)
minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() }
t.Cleanup(func() {
require.NoError(t, db.Close())
})
ctx := context.Background()
appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
t.Helper()
app := db.Appender(ctx)
_, err := app.AppendHistogram(0, lbls, minute(tsMinute), h)
require.NoError(t, err)
require.NoError(t, app.Commit())
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
}
appendFloat := func(lbls labels.Labels, tsMinute int, val float64, exp *[]tsdbutil.Sample) {
t.Helper()
app := db.Appender(ctx)
_, err := app.Append(0, lbls, minute(tsMinute), val)
require.NoError(t, err)
require.NoError(t, app.Commit())
*exp = append(*exp, sample{t: minute(tsMinute), v: val})
}
testQuery := func(name, value string, exp map[string][]tsdbutil.Sample) {
t.Helper()
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, name, value))
require.Equal(t, exp, act)
}
baseH := &histogram.Histogram{
Count: 11,
ZeroCount: 4,
ZeroThreshold: 0.001,
Sum: 35.5,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0},
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 1},
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{1, 2, -1},
}
var (
series1 = labels.FromStrings("foo", "bar1")
series2 = labels.FromStrings("foo", "bar2")
series3 = labels.FromStrings("foo", "bar3")
series4 = labels.FromStrings("foo", "bar4")
exp1, exp2, exp3, exp4 []tsdbutil.Sample
)
// TODO(codesome): test everything for negative buckets as well.
t.Run("series with only histograms", func(t *testing.T) {
h := baseH.Copy() // This is shared across all sub tests.
appendHistogram(series1, 100, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
h.PositiveBuckets[0]++
h.NegativeBuckets[0] += 2
h.Count += 10
appendHistogram(series1, 101, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
t.Run("changing schema", func(t *testing.T) {
h.Schema = 2
appendHistogram(series1, 102, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// Schema back to old.
h.Schema = 1
appendHistogram(series1, 103, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
})
t.Run("new buckets incoming", func(t *testing.T) {
// This histogram with new bucket at the end causes the re-encoding of the previous histogram.
// Hence the previous histogram is recoded into this new layout.
// But the query returns the histogram from the in-memory buffer, hence we don't see the recode here yet.
h.PositiveSpans[1].Length++
h.PositiveBuckets = append(h.PositiveBuckets, 1)
h.Count += 3
appendHistogram(series1, 104, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// Now we add the new buckets in between. Empty bucket is again not present for the old histogram.
h.PositiveSpans[0].Length++
h.PositiveSpans[1].Offset--
h.Count += 3
// {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1}
h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...)
appendHistogram(series1, 105, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// We add 4 more histograms to clear out the buffer and see the re-encoded histograms.
appendHistogram(series1, 106, h.Copy(), &exp1)
appendHistogram(series1, 107, h.Copy(), &exp1)
appendHistogram(series1, 108, h.Copy(), &exp1)
appendHistogram(series1, 109, h.Copy(), &exp1)
// Update the expected histograms to reflect the re-encoding.
l := len(exp1)
h7 := exp1[l-7].H()
h7.PositiveSpans = exp1[l-1].H().PositiveSpans
h7.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2} // -3 and -2 are the empty buckets.
exp1[l-7] = sample{t: exp1[l-7].T(), h: h7}
h6 := exp1[l-6].H()
h6.PositiveSpans = exp1[l-1].H().PositiveSpans
h6.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1} // -3 is the empty bucket.
exp1[l-6] = sample{t: exp1[l-6].T(), h: h6}
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
})
t.Run("buckets disappearing", func(t *testing.T) {
h.PositiveSpans[1].Length--
h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1]
appendHistogram(series1, 110, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
})
})
t.Run("series starting with float and then getting histograms", func(t *testing.T) {
appendFloat(series2, 100, 100, &exp2)
appendFloat(series2, 101, 101, &exp2)
appendFloat(series2, 102, 102, &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
h := baseH.Copy()
appendHistogram(series2, 103, h.Copy(), &exp2)
appendHistogram(series2, 104, h.Copy(), &exp2)
appendHistogram(series2, 105, h.Copy(), &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
// Switching between float and histograms again.
appendFloat(series2, 106, 106, &exp2)
appendFloat(series2, 107, 107, &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
appendHistogram(series2, 108, h.Copy(), &exp2)
appendHistogram(series2, 109, h.Copy(), &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
})
t.Run("series starting with histogram and then getting float", func(t *testing.T) {
h := baseH.Copy()
appendHistogram(series3, 101, h.Copy(), &exp3)
appendHistogram(series3, 102, h.Copy(), &exp3)
appendHistogram(series3, 103, h.Copy(), &exp3)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
appendFloat(series3, 104, 100, &exp3)
appendFloat(series3, 105, 101, &exp3)
appendFloat(series3, 106, 102, &exp3)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
// Switching between histogram and float again.
appendHistogram(series3, 107, h.Copy(), &exp3)
appendHistogram(series3, 108, h.Copy(), &exp3)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
appendFloat(series3, 109, 106, &exp3)
appendFloat(series3, 110, 107, &exp3)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
})
t.Run("query mix of histogram and float series", func(t *testing.T) {
// A float only series.
appendFloat(series4, 100, 100, &exp4)
appendFloat(series4, 101, 101, &exp4)
appendFloat(series4, 102, 102, &exp4)
testQuery("foo", "bar.*", map[string][]tsdbutil.Sample{
series1.String(): exp1,
series2.String(): exp2,
series3.String(): exp3,
series4.String(): exp4,
})
})
}
func TestQueryHistogramFromBlocks(t *testing.T) {
minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() }
testBlockQuerying := func(t *testing.T, blockSeries ...[]storage.Series) {
t.Helper()
opts := DefaultOptions()
opts.AllowOverlappingBlocks = true
db := openTestDB(t, opts, nil)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
ctx := context.Background()
exp := make(map[string][]tsdbutil.Sample)
for _, series := range blockSeries {
createBlock(t, db.Dir(), series)
for _, s := range series {
key := s.Labels().String()
it := s.Iterator()
slice := exp[key]
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {
case chunkenc.ValFloat:
ts, v := it.At()
slice = append(slice, sample{t: ts, v: v})
case chunkenc.ValHistogram:
ts, h := it.AtHistogram()
slice = append(slice, sample{t: ts, h: h})
}
}
sort.Slice(slice, func(i, j int) bool {
return slice[i].T() < slice[j].T()
})
exp[key] = slice
}
}
require.Len(t, db.Blocks(), 0)
require.NoError(t, db.reload())
require.Len(t, db.Blocks(), len(blockSeries))
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
require.Equal(t, exp, res)
}
t.Run("serial blocks with only histograms", func(t *testing.T) {
testBlockQuerying(t,
genHistogramSeries(10, 5, minute(0), minute(119), minute(1)),
genHistogramSeries(10, 5, minute(120), minute(239), minute(1)),
genHistogramSeries(10, 5, minute(240), minute(359), minute(1)),
)
})
t.Run("overlapping blocks with only histograms", func(t *testing.T) {
testBlockQuerying(t,
genHistogramSeries(10, 5, minute(0), minute(120), minute(3)),
genHistogramSeries(10, 5, minute(1), minute(120), minute(3)),
genHistogramSeries(10, 5, minute(2), minute(120), minute(3)),
)
})
t.Run("serial blocks with mix of histograms and float64", func(t *testing.T) {
testBlockQuerying(t,
genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(1)),
genHistogramSeries(10, 5, minute(61), minute(120), minute(1)),
genHistogramAndFloatSeries(10, 5, minute(121), minute(180), minute(1)),
)
})
t.Run("overlapping blocks with mix of histograms and float64", func(t *testing.T) {
testBlockQuerying(t,
genHistogramAndFloatSeries(10, 5, minute(0), minute(60), minute(3)),
genHistogramSeries(10, 5, minute(46), minute(100), minute(3)),
genHistogramAndFloatSeries(10, 5, minute(89), minute(140), minute(3)),
)
})
}

View file

@ -553,7 +553,7 @@ func ValidateHistogram(h *histogram.Histogram) error {
if c := negativeCount + positiveCount; c > h.Count {
return errors.Wrap(
storage.ErrHistogramCountNotBigEnough,
fmt.Sprintf("%d observations found in buckets, but overall count is %d", c, h.Count),
fmt.Sprintf("%d observations found in buckets, but the Count field is %d", c, h.Count),
)
}
@ -712,7 +712,6 @@ func (a *headAppender) Commit() (err error) {
defer a.head.putHistogramBuffer(a.histograms)
defer a.head.putMetadataBuffer(a.metadata)
defer a.head.iso.closeAppend(a.appendID)
total := len(a.samples)
var series *memSeries
for i, s := range a.samples {

View file

@ -3886,7 +3886,7 @@ func TestHistogramValidation(t *testing.T) {
NegativeBuckets: []int64{1},
PositiveBuckets: []int64{1},
},
errMsg: `2 observations found in buckets, but overall count is 0`,
errMsg: `2 observations found in buckets, but the Count field is 0`,
},
}

View file

@ -696,7 +696,6 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
return false
}
p.curr = p.currChkMeta
if p.currDelIter == nil {
return true

View file

@ -54,14 +54,34 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l
ref := storage.SeriesRef(0)
it := s.Iterator()
lset := s.Labels()
for it.Next() == chunkenc.ValFloat {
// TODO(beorn7): Add histogram support.
t, v := it.At()
ref, err = app.Append(ref, lset, t, v)
typ := it.Next()
lastTyp := typ
for ; typ != chunkenc.ValNone; typ = it.Next() {
if lastTyp != typ {
// The behaviour of appender is undefined if samples of different types
// are appended to the same series in a single Commit().
if err = app.Commit(); err != nil {
return "", err
}
app = w.Appender(ctx)
sampleCount = 0
}
switch typ {
case chunkenc.ValFloat:
t, v := it.At()
ref, err = app.Append(ref, lset, t, v)
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
ref, err = app.AppendHistogram(ref, lset, t, h)
default:
return "", fmt.Errorf("unknown sample type %s", typ.String())
}
if err != nil {
return "", err
}
sampleCount++
lastTyp = typ
}
if it.Err() != nil {
return "", it.Err()