mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Update OOO querying tests to include native histogram samples
Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com> Co-authored by: Jeanette Tan <jeanette.tan@grafana.com>: Co-authored by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>: Co-authored by: Fiona Liao <fiona.liao@grafana.com>:
This commit is contained in:
parent
dd65c3d3fc
commit
d91fc46794
562
tsdb/db_test.go
562
tsdb/db_test.go
|
@ -5017,21 +5017,87 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa
|
||||||
verifySamples(90, 109)
|
verifySamples(90, 109)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_Querier_OOOQuery(t *testing.T) {
|
func TestQuerierOOOQuery(t *testing.T) {
|
||||||
|
scenarios := map[string]struct {
|
||||||
|
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error)
|
||||||
|
sampleFunc func(ts int64) chunks.Sample
|
||||||
|
}{
|
||||||
|
"float": {
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
return app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts))
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, f: float64(ts)}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"integer histogram": {
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
h := tsdbutil.GenerateTestHistogram(int(ts))
|
||||||
|
if counterReset {
|
||||||
|
h.CounterResetHint = histogram.CounterReset
|
||||||
|
}
|
||||||
|
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil)
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"float histogram": {
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
fh := tsdbutil.GenerateTestFloatHistogram(int(ts))
|
||||||
|
if counterReset {
|
||||||
|
fh.CounterResetHint = histogram.CounterReset
|
||||||
|
}
|
||||||
|
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh)
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"integer histogram counter resets": {
|
||||||
|
// Adding counter reset to all histograms means each histogram will have its own chunk.
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
h := tsdbutil.GenerateTestHistogram(int(ts))
|
||||||
|
h.CounterResetHint = histogram.CounterReset // for this scenario, ignore the counterReset argument
|
||||||
|
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil)
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, scenario := range scenarios {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
testQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testQuerierOOOQuery(t *testing.T,
|
||||||
|
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error),
|
||||||
|
sampleFunc func(ts int64) chunks.Sample,
|
||||||
|
) {
|
||||||
opts := DefaultOptions()
|
opts := DefaultOptions()
|
||||||
opts.OutOfOrderCapMax = 30
|
opts.OutOfOrderCapMax = 30
|
||||||
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
||||||
|
|
||||||
series1 := labels.FromStrings("foo", "bar1")
|
series1 := labels.FromStrings("foo", "bar1")
|
||||||
|
|
||||||
|
type filterFunc func(t int64) bool
|
||||||
|
defaultFilterFunc := func(t int64) bool { return true }
|
||||||
|
|
||||||
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||||
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample) ([]chunks.Sample, int) {
|
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc, counterReset bool) ([]chunks.Sample, int) {
|
||||||
app := db.Appender(context.Background())
|
app := db.Appender(context.Background())
|
||||||
totalAppended := 0
|
totalAppended := 0
|
||||||
for min := fromMins; min <= toMins; min += time.Minute.Milliseconds() {
|
for min := fromMins; min <= toMins; min += time.Minute.Milliseconds() {
|
||||||
_, err := app.Append(0, series1, min, float64(min))
|
if !filter(min) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err := appendFunc(app, min, counterReset)
|
||||||
if min >= queryMinT && min <= queryMaxT {
|
if min >= queryMinT && min <= queryMaxT {
|
||||||
expSamples = append(expSamples, sample{t: min, f: float64(min)})
|
expSamples = append(expSamples, sampleFunc(min))
|
||||||
}
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
totalAppended++
|
totalAppended++
|
||||||
|
@ -5040,49 +5106,101 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
||||||
return expSamples, totalAppended
|
return expSamples, totalAppended
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sampleBatch struct {
|
||||||
|
minT int64
|
||||||
|
maxT int64
|
||||||
|
filter filterFunc
|
||||||
|
counterReset bool
|
||||||
|
isOOO bool
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
queryMinT int64
|
queryMinT int64
|
||||||
queryMaxT int64
|
queryMaxT int64
|
||||||
inOrderMinT int64
|
batches []sampleBatch
|
||||||
inOrderMaxT int64
|
|
||||||
oooMinT int64
|
|
||||||
oooMaxT int64
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
||||||
queryMinT: minutes(0),
|
queryMinT: minutes(0),
|
||||||
queryMaxT: minutes(200),
|
queryMaxT: minutes(200),
|
||||||
inOrderMinT: minutes(100),
|
batches: []sampleBatch{
|
||||||
inOrderMaxT: minutes(200),
|
{
|
||||||
oooMinT: minutes(0),
|
minT: minutes(100),
|
||||||
oooMaxT: minutes(99),
|
maxT: minutes(200),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "partial query interval returns only samples within interval",
|
name: "partial query interval returns only samples within interval",
|
||||||
queryMinT: minutes(20),
|
queryMinT: minutes(20),
|
||||||
queryMaxT: minutes(180),
|
queryMaxT: minutes(180),
|
||||||
inOrderMinT: minutes(100),
|
batches: []sampleBatch{
|
||||||
inOrderMaxT: minutes(200),
|
{
|
||||||
oooMinT: minutes(0),
|
minT: minutes(100),
|
||||||
oooMaxT: minutes(99),
|
maxT: minutes(200),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "alternating OOO batches", // in order: 100-200 normal. out of order first path: 0, 2, 4, ... 98 (no counter reset), second pass: 1, 3, 5, ... 99 (with counter reset)
|
||||||
|
queryMinT: minutes(0),
|
||||||
|
queryMaxT: minutes(200),
|
||||||
|
batches: []sampleBatch{
|
||||||
|
{
|
||||||
|
minT: minutes(100),
|
||||||
|
maxT: minutes(200),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: func(t int64) bool { return t%2 == 0 },
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: func(t int64) bool { return t%2 == 1 },
|
||||||
|
counterReset: true,
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||||
db := openTestDB(t, opts, nil)
|
db := openTestDB(t, opts, nil)
|
||||||
db.DisableCompactions()
|
db.DisableCompactions()
|
||||||
|
db.EnableNativeHistograms()
|
||||||
|
db.EnableOOONativeHistograms()
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var expSamples []chunks.Sample
|
var expSamples []chunks.Sample
|
||||||
|
var oooSamples, appendedCount int
|
||||||
|
|
||||||
// Add in-order samples.
|
for _, batch := range tc.batches {
|
||||||
expSamples, _ = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter, batch.counterReset)
|
||||||
|
if batch.isOOO {
|
||||||
// Add out-of-order samples.
|
oooSamples += appendedCount
|
||||||
expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sort.Slice(expSamples, func(i, j int) bool {
|
sort.Slice(expSamples, func(i, j int) bool {
|
||||||
return expSamples[i].T() < expSamples[j].T()
|
return expSamples[i].T() < expSamples[j].T()
|
||||||
|
@ -5093,29 +5211,95 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
||||||
defer querier.Close()
|
defer querier.Close()
|
||||||
|
|
||||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
|
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
|
||||||
require.NotNil(t, seriesSet[series1.String()])
|
gotSamples := seriesSet[series1.String()]
|
||||||
|
require.NotNil(t, gotSamples)
|
||||||
require.Len(t, seriesSet, 1)
|
require.Len(t, seriesSet, 1)
|
||||||
require.Equal(t, expSamples, seriesSet[series1.String()])
|
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
|
||||||
requireEqualOOOSamples(t, oooSamples, db)
|
requireEqualOOOSamples(t, oooSamples, db)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
func TestChunkQuerierOOOQuery(t *testing.T) {
|
||||||
|
scenarios := map[string]struct {
|
||||||
|
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error)
|
||||||
|
sampleFunc func(ts int64) chunks.Sample
|
||||||
|
}{
|
||||||
|
"float": {
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
return app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts))
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, f: float64(ts)}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"integer histogram": {
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
h := tsdbutil.GenerateTestHistogram(int(ts))
|
||||||
|
if counterReset {
|
||||||
|
h.CounterResetHint = histogram.CounterReset
|
||||||
|
}
|
||||||
|
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil)
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"float histogram": {
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
fh := tsdbutil.GenerateTestFloatHistogram(int(ts))
|
||||||
|
if counterReset {
|
||||||
|
fh.CounterResetHint = histogram.CounterReset
|
||||||
|
}
|
||||||
|
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh)
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(ts))}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"integer histogram counter resets": {
|
||||||
|
// Adding counter reset to all histograms means each histogram will have its own chunk.
|
||||||
|
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||||
|
h := tsdbutil.GenerateTestHistogram(int(ts))
|
||||||
|
h.CounterResetHint = histogram.CounterReset // for this scenario, ignore the counterReset argument
|
||||||
|
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil)
|
||||||
|
},
|
||||||
|
sampleFunc: func(ts int64) chunks.Sample {
|
||||||
|
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for name, scenario := range scenarios {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testChunkQuerierOOOQuery(t *testing.T,
|
||||||
|
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error),
|
||||||
|
sampleFunc func(ts int64) chunks.Sample,
|
||||||
|
) {
|
||||||
opts := DefaultOptions()
|
opts := DefaultOptions()
|
||||||
opts.OutOfOrderCapMax = 30
|
opts.OutOfOrderCapMax = 30
|
||||||
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
||||||
|
|
||||||
series1 := labels.FromStrings("foo", "bar1")
|
series1 := labels.FromStrings("foo", "bar1")
|
||||||
|
|
||||||
|
type filterFunc func(t int64) bool
|
||||||
|
defaultFilterFunc := func(t int64) bool { return true }
|
||||||
|
|
||||||
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||||
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample) ([]chunks.Sample, int) {
|
addSample := func(db *DB, fromMins, toMins, queryMinT, queryMaxT int64, expSamples []chunks.Sample, filter filterFunc, counterReset bool) ([]chunks.Sample, int) {
|
||||||
app := db.Appender(context.Background())
|
app := db.Appender(context.Background())
|
||||||
totalAppended := 0
|
totalAppended := 0
|
||||||
for min := fromMins; min <= toMins; min += time.Minute.Milliseconds() {
|
for min := fromMins; min <= toMins; min += time.Minute.Milliseconds() {
|
||||||
_, err := app.Append(0, series1, min, float64(min))
|
if !filter(min) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err := appendFunc(app, min, counterReset)
|
||||||
if min >= queryMinT && min <= queryMaxT {
|
if min >= queryMinT && min <= queryMaxT {
|
||||||
expSamples = append(expSamples, sample{t: min, f: float64(min)})
|
expSamples = append(expSamples, sampleFunc(min))
|
||||||
}
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
totalAppended++
|
totalAppended++
|
||||||
|
@ -5124,49 +5308,101 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
||||||
return expSamples, totalAppended
|
return expSamples, totalAppended
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sampleBatch struct {
|
||||||
|
minT int64
|
||||||
|
maxT int64
|
||||||
|
filter filterFunc
|
||||||
|
counterReset bool
|
||||||
|
isOOO bool
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
queryMinT int64
|
queryMinT int64
|
||||||
queryMaxT int64
|
queryMaxT int64
|
||||||
inOrderMinT int64
|
batches []sampleBatch
|
||||||
inOrderMaxT int64
|
|
||||||
oooMinT int64
|
|
||||||
oooMaxT int64
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
name: "query interval covering ooomint and inordermaxt returns all ingested samples",
|
||||||
queryMinT: minutes(0),
|
queryMinT: minutes(0),
|
||||||
queryMaxT: minutes(200),
|
queryMaxT: minutes(200),
|
||||||
inOrderMinT: minutes(100),
|
batches: []sampleBatch{
|
||||||
inOrderMaxT: minutes(200),
|
{
|
||||||
oooMinT: minutes(0),
|
minT: minutes(100),
|
||||||
oooMaxT: minutes(99),
|
maxT: minutes(200),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "partial query interval returns only samples within interval",
|
name: "partial query interval returns only samples within interval",
|
||||||
queryMinT: minutes(20),
|
queryMinT: minutes(20),
|
||||||
queryMaxT: minutes(180),
|
queryMaxT: minutes(180),
|
||||||
inOrderMinT: minutes(100),
|
batches: []sampleBatch{
|
||||||
inOrderMaxT: minutes(200),
|
{
|
||||||
oooMinT: minutes(0),
|
minT: minutes(100),
|
||||||
oooMaxT: minutes(99),
|
maxT: minutes(200),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "alternating OOO batches", // in order: 100-200 normal. out of order first path: 0, 2, 4, ... 98 (no counter reset), second pass: 1, 3, 5, ... 99 (with counter reset)
|
||||||
|
queryMinT: minutes(0),
|
||||||
|
queryMaxT: minutes(200),
|
||||||
|
batches: []sampleBatch{
|
||||||
|
{
|
||||||
|
minT: minutes(100),
|
||||||
|
maxT: minutes(200),
|
||||||
|
filter: defaultFilterFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: func(t int64) bool { return t%2 == 0 },
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
minT: minutes(0),
|
||||||
|
maxT: minutes(99),
|
||||||
|
filter: func(t int64) bool { return t%2 == 1 },
|
||||||
|
counterReset: true,
|
||||||
|
isOOO: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||||
db := openTestDB(t, opts, nil)
|
db := openTestDB(t, opts, nil)
|
||||||
db.DisableCompactions()
|
db.DisableCompactions()
|
||||||
|
db.EnableNativeHistograms()
|
||||||
|
db.EnableOOONativeHistograms()
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var expSamples []chunks.Sample
|
var expSamples []chunks.Sample
|
||||||
|
var oooSamples, appendedCount int
|
||||||
|
|
||||||
// Add in-order samples.
|
for _, batch := range tc.batches {
|
||||||
expSamples, _ = addSample(db, tc.inOrderMinT, tc.inOrderMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
expSamples, appendedCount = addSample(db, batch.minT, batch.maxT, tc.queryMinT, tc.queryMaxT, expSamples, batch.filter, batch.counterReset)
|
||||||
|
if batch.isOOO {
|
||||||
// Add out-of-order samples.
|
oooSamples += appendedCount
|
||||||
expSamples, oooSamples := addSample(db, tc.oooMinT, tc.oooMaxT, tc.queryMinT, tc.queryMaxT, expSamples)
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sort.Slice(expSamples, func(i, j int) bool {
|
sort.Slice(expSamples, func(i, j int) bool {
|
||||||
return expSamples[i].T() < expSamples[j].T()
|
return expSamples[i].T() < expSamples[j].T()
|
||||||
|
@ -5183,12 +5419,186 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
||||||
var gotSamples []chunks.Sample
|
var gotSamples []chunks.Sample
|
||||||
for _, chunk := range chks[series1.String()] {
|
for _, chunk := range chks[series1.String()] {
|
||||||
it := chunk.Chunk.Iterator(nil)
|
it := chunk.Chunk.Iterator(nil)
|
||||||
for it.Next() == chunkenc.ValFloat {
|
smpls, err := storage.ExpandSamples(it, newSample)
|
||||||
ts, v := it.At()
|
require.NoError(t, err)
|
||||||
gotSamples = append(gotSamples, sample{t: ts, f: v})
|
gotSamples = append(gotSamples, smpls...)
|
||||||
|
require.NoError(t, it.Err())
|
||||||
|
}
|
||||||
|
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test verifies the counter reset headers for in-order and out-of-order samples upon ingestion.
|
||||||
|
// Note that when the counter reset(s) occur in OOO samples, the header is set to UnknownCounterReset
|
||||||
|
// rather than CounterReset. This is because with OOO native histogram samples, it cannot be definitely
|
||||||
|
// determined if a counter reset occurred because the samples are not consecutive, and another sample
|
||||||
|
// could potentially come in that would change the status of the header. In this case, the UnknownCounterReset
|
||||||
|
// headers would be re-checked at query time and updated as needed. However, this test is checking the counter
|
||||||
|
// reset headers at the time of storage.
|
||||||
|
func TestOOONativeHistogramsWithCounterResets(t *testing.T) {
|
||||||
|
for name, scenario := range sampleTypeScenarios {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
if name == intHistogram || name == floatHistogram {
|
||||||
|
testOOONativeHistogramsWithCounterResets(t, scenario)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testOOONativeHistogramsWithCounterResets(t *testing.T, scenario sampleTypeScenario) {
|
||||||
|
opts := DefaultOptions()
|
||||||
|
opts.OutOfOrderCapMax = 30
|
||||||
|
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
|
||||||
|
|
||||||
|
type resetFunc func(v int64) bool
|
||||||
|
defaultResetFunc := func(v int64) bool { return false }
|
||||||
|
|
||||||
|
lbls := labels.FromStrings("foo", "bar1")
|
||||||
|
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||||
|
|
||||||
|
type sampleBatch struct {
|
||||||
|
from int64
|
||||||
|
until int64
|
||||||
|
shouldReset resetFunc
|
||||||
|
expCounterResetHints []histogram.CounterResetHint
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
queryMin int64
|
||||||
|
queryMax int64
|
||||||
|
batches []sampleBatch
|
||||||
|
expectedSamples []chunks.Sample
|
||||||
|
}{
|
||||||
|
//{
|
||||||
|
// name: "Counter reset within in-order samples",
|
||||||
|
// queryMin: minutes(40),
|
||||||
|
// queryMax: minutes(55),
|
||||||
|
// batches: []sampleBatch{
|
||||||
|
// // In-order samples
|
||||||
|
// {
|
||||||
|
// from: 40,
|
||||||
|
// until: 50,
|
||||||
|
// shouldReset: func(v int64) bool {
|
||||||
|
// return v == 45
|
||||||
|
// },
|
||||||
|
// expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.CounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset},
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
//},
|
||||||
|
//{
|
||||||
|
// name: "Counter reset right at beginning of OOO samples",
|
||||||
|
// queryMin: minutes(40),
|
||||||
|
// queryMax: minutes(55),
|
||||||
|
// batches: []sampleBatch{
|
||||||
|
// // In-order samples
|
||||||
|
// {
|
||||||
|
// from: 40,
|
||||||
|
// until: 45,
|
||||||
|
// shouldReset: defaultResetFunc,
|
||||||
|
// expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset},
|
||||||
|
// },
|
||||||
|
// {
|
||||||
|
// from: 50,
|
||||||
|
// until: 55,
|
||||||
|
// shouldReset: defaultResetFunc,
|
||||||
|
// expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset},
|
||||||
|
// },
|
||||||
|
// // OOO samples
|
||||||
|
// {
|
||||||
|
// from: 45,
|
||||||
|
// until: 50,
|
||||||
|
// shouldReset: func(v int64) bool {
|
||||||
|
// return v == 45
|
||||||
|
// },
|
||||||
|
// expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset},
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
//},
|
||||||
|
{
|
||||||
|
name: "Counter resets in both in-order and OOO samples",
|
||||||
|
queryMin: minutes(40),
|
||||||
|
queryMax: minutes(55),
|
||||||
|
batches: []sampleBatch{
|
||||||
|
// In-order samples
|
||||||
|
{
|
||||||
|
from: 40,
|
||||||
|
until: 45,
|
||||||
|
shouldReset: func(v int64) bool {
|
||||||
|
return v == 44
|
||||||
|
},
|
||||||
|
expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.CounterReset},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
from: 50,
|
||||||
|
until: 55,
|
||||||
|
shouldReset: defaultResetFunc,
|
||||||
|
expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset},
|
||||||
|
},
|
||||||
|
// OOO samples
|
||||||
|
{
|
||||||
|
from: 45,
|
||||||
|
until: 50,
|
||||||
|
shouldReset: func(v int64) bool {
|
||||||
|
return v == 49
|
||||||
|
},
|
||||||
|
expCounterResetHints: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.NotCounterReset, histogram.UnknownCounterReset},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
|
||||||
|
db := openTestDB(t, opts, nil)
|
||||||
|
db.DisableCompactions()
|
||||||
|
db.EnableOOONativeHistograms()
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
|
||||||
|
expSamples := make(map[string][]chunks.Sample)
|
||||||
|
|
||||||
|
for _, batch := range tc.batches {
|
||||||
|
j := batch.from
|
||||||
|
smplIdx := 0
|
||||||
|
for i := batch.from; i < batch.until; i++ {
|
||||||
|
resetCount := batch.shouldReset(i)
|
||||||
|
if resetCount {
|
||||||
|
j = 0
|
||||||
|
}
|
||||||
|
_, s, err := scenario.appendFunc(app, lbls, minutes(i), j)
|
||||||
|
require.NoError(t, err)
|
||||||
|
if s.Type() == chunkenc.ValHistogram {
|
||||||
|
s.H().CounterResetHint = batch.expCounterResetHints[smplIdx]
|
||||||
|
} else if s.Type() == chunkenc.ValFloatHistogram {
|
||||||
|
s.FH().CounterResetHint = batch.expCounterResetHints[smplIdx]
|
||||||
|
}
|
||||||
|
expSamples[lbls.String()] = append(expSamples[lbls.String()], s)
|
||||||
|
j++
|
||||||
|
smplIdx++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
require.Equal(t, expSamples, gotSamples)
|
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
for k, v := range expSamples {
|
||||||
|
sort.Slice(v, func(i, j int) bool {
|
||||||
|
return v[i].T() < v[j].T()
|
||||||
|
})
|
||||||
|
expSamples[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
querier, err := db.Querier(tc.queryMin, tc.queryMax)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer querier.Close()
|
||||||
|
|
||||||
|
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
|
||||||
|
require.NotNil(t, seriesSet[lbls.String()])
|
||||||
|
require.Len(t, seriesSet, 1)
|
||||||
|
requireEqualSeries(t, expSamples, seriesSet, false)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6878,23 +7288,11 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
|
||||||
|
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
key := s.Labels().String()
|
key := s.Labels().String()
|
||||||
it = s.Iterator(it)
|
|
||||||
slice := exp[key]
|
slice := exp[key]
|
||||||
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
|
it = s.Iterator(it)
|
||||||
switch typ {
|
smpls, err := storage.ExpandSamples(it, nil)
|
||||||
case chunkenc.ValFloat:
|
require.NoError(t, err)
|
||||||
ts, v := it.At()
|
slice = append(slice, smpls...)
|
||||||
slice = append(slice, sample{t: ts, f: v})
|
|
||||||
case chunkenc.ValHistogram:
|
|
||||||
ts, h := it.AtHistogram(nil)
|
|
||||||
slice = append(slice, sample{t: ts, h: h})
|
|
||||||
case chunkenc.ValFloatHistogram:
|
|
||||||
ts, h := it.AtFloatHistogram(nil)
|
|
||||||
slice = append(slice, sample{t: ts, fh: h})
|
|
||||||
default:
|
|
||||||
t.Fatalf("unexpected sample value type %d", typ)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Slice(slice, func(i, j int) bool {
|
sort.Slice(slice, func(i, j int) bool {
|
||||||
return slice[i].T() < slice[j].T()
|
return slice[i].T() < slice[j].T()
|
||||||
})
|
})
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
|
@ -27,7 +28,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
float = "float"
|
float = "float"
|
||||||
|
intHistogram = "integer histogram"
|
||||||
|
floatHistogram = "float histogram"
|
||||||
|
gaugeIntHistogram = "gauge int histogram"
|
||||||
|
gaugeFloatHistogram = "gauge float histogram"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testValue struct {
|
type testValue struct {
|
||||||
|
@ -42,7 +47,6 @@ type sampleTypeScenario struct {
|
||||||
sampleFunc func(ts, value int64) sample
|
sampleFunc func(ts, value int64) sample
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: native histogram sample types will be added as part of out-of-order native histogram support; see #11220.
|
|
||||||
var sampleTypeScenarios = map[string]sampleTypeScenario{
|
var sampleTypeScenarios = map[string]sampleTypeScenario{
|
||||||
float: {
|
float: {
|
||||||
sampleType: sampleMetricTypeFloat,
|
sampleType: sampleMetricTypeFloat,
|
||||||
|
@ -55,50 +59,50 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
|
||||||
return sample{t: ts, f: float64(value)}
|
return sample{t: ts, f: float64(value)}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
// intHistogram: {
|
intHistogram: {
|
||||||
// sampleType: sampleMetricTypeHistogram,
|
sampleType: sampleMetricTypeHistogram,
|
||||||
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
||||||
// s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
|
s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
|
||||||
// ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
|
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
|
||||||
// return ref, s, err
|
return ref, s, err
|
||||||
// },
|
},
|
||||||
// sampleFunc: func(ts, value int64) sample {
|
sampleFunc: func(ts, value int64) sample {
|
||||||
// return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
|
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
|
||||||
// },
|
},
|
||||||
// },
|
},
|
||||||
// floatHistogram: {
|
floatHistogram: {
|
||||||
// sampleType: sampleMetricTypeHistogram,
|
sampleType: sampleMetricTypeHistogram,
|
||||||
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
||||||
// s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
|
s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
|
||||||
// ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
|
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
|
||||||
// return ref, s, err
|
return ref, s, err
|
||||||
// },
|
},
|
||||||
// sampleFunc: func(ts, value int64) sample {
|
sampleFunc: func(ts, value int64) sample {
|
||||||
// return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
|
return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
|
||||||
// },
|
},
|
||||||
// },
|
},
|
||||||
// gaugeIntHistogram: {
|
gaugeIntHistogram: {
|
||||||
// sampleType: sampleMetricTypeHistogram,
|
sampleType: sampleMetricTypeHistogram,
|
||||||
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
||||||
// s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
|
s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
|
||||||
// ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
|
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
|
||||||
// return ref, s, err
|
return ref, s, err
|
||||||
// },
|
},
|
||||||
// sampleFunc: func(ts, value int64) sample {
|
sampleFunc: func(ts, value int64) sample {
|
||||||
// return sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
|
return sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
|
||||||
// },
|
},
|
||||||
// },
|
},
|
||||||
// gaugeFloatHistogram: {
|
gaugeFloatHistogram: {
|
||||||
// sampleType: sampleMetricTypeHistogram,
|
sampleType: sampleMetricTypeHistogram,
|
||||||
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
|
||||||
// s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
|
s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
|
||||||
// ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
|
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
|
||||||
// return ref, s, err
|
return ref, s, err
|
||||||
// },
|
},
|
||||||
// sampleFunc: func(ts, value int64) sample {
|
sampleFunc: func(ts, value int64) sample {
|
||||||
// return sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
|
return sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
|
||||||
// },
|
},
|
||||||
// },
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// requireEqualSeries checks that the actual series are equal to the expected ones. It ignores the counter reset hints for histograms.
|
// requireEqualSeries checks that the actual series are equal to the expected ones. It ignores the counter reset hints for histograms.
|
||||||
|
|
Loading…
Reference in a new issue