Merge pull request #13699 from prometheus/cedwards/tsdb-test-helpers

Refactor tsdb tests to use test util functions to test different sample types
This commit is contained in:
Carrie Edwards 2024-07-03 09:50:29 -07:00 committed by GitHub
commit ba948e94fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 670 additions and 353 deletions

View file

@ -4495,6 +4495,14 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
//
// are not included in this compaction.
func TestOOOCompaction(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOCompaction(t, scenario)
})
}
}
func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
ctx := context.Background()
@ -4516,9 +4524,9 @@ func TestOOOCompaction(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, err = app.Append(0, series2, ts, float64(2*ts))
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -4551,8 +4559,8 @@ func TestOOOCompaction(t *testing.T) {
fromMins, toMins := r[0], r[1]
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
}
expRes := map[string][]chunks.Sample{
@ -4564,7 +4572,7 @@ func TestOOOCompaction(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
verifyDBSamples() // Before any compaction.
@ -4619,8 +4627,8 @@ func TestOOOCompaction(t *testing.T) {
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
expRes := map[string][]chunks.Sample{
series1.String(): series1Samples,
@ -4631,7 +4639,7 @@ func TestOOOCompaction(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
// Checking for expected data in the blocks.
@ -4675,6 +4683,14 @@ func TestOOOCompaction(t *testing.T) {
// TestOOOCompactionWithNormalCompaction tests if OOO compaction is performed
// when the normal head's compaction is done.
func TestOOOCompactionWithNormalCompaction(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOCompactionWithNormalCompaction(t, scenario)
})
}
}
func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
ctx := context.Background()
@ -4696,9 +4712,9 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, err = app.Append(0, series2, ts, float64(2*ts))
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -4751,8 +4767,8 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
expRes := map[string][]chunks.Sample{
series1.String(): series1Samples,
@ -4763,7 +4779,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
// Checking for expected data in the blocks.
@ -4775,6 +4791,14 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
// configured to not have wal and wbl but its able to compact both the in-order
// and out-of-order head.
func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOCompactionWithDisabledWriteLog(t, scenario)
})
}
}
func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
ctx := context.Background()
@ -4797,9 +4821,9 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, err = app.Append(0, series2, ts, float64(2*ts))
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -4852,8 +4876,8 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
}
expRes := map[string][]chunks.Sample{
series1.String(): series1Samples,
@ -4864,7 +4888,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
// Checking for expected data in the blocks.
@ -4876,6 +4900,14 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
// missing after a restart while snapshot was enabled, but the query still returns the right
// data from the mmap chunks.
func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t, scenario)
})
}
}
func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
ctx := context.Background()
@ -4898,9 +4930,9 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
_, err = app.Append(0, series2, ts, float64(2*ts))
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -4946,8 +4978,8 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
series2Samples = append(series2Samples, scenario.sampleFunc(ts, ts*2))
}
expRes := map[string][]chunks.Sample{
series1.String(): series1Samples,
@ -4958,7 +4990,7 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
// Checking for expected ooo data from mmap chunks.
@ -5159,6 +5191,14 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
}
func TestOOOAppendAndQuery(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOAppendAndQuery(t, scenario)
})
}
}
func testOOOAppendAndQuery(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds()
@ -5180,13 +5220,13 @@ func TestOOOAppendAndQuery(t *testing.T) {
key := lbls.String()
from, to := minutes(fromMins), minutes(toMins)
for min := from; min <= to; min += time.Minute.Milliseconds() {
val := rand.Float64()
_, err := app.Append(0, lbls, min, val)
val := rand.Intn(1000)
_, s, err := scenario.appendFunc(app, lbls, min, int64(val))
if faceError {
require.Error(t, err)
} else {
require.NoError(t, err)
appendedSamples[key] = append(appendedSamples[key], sample{t: min, f: val})
appendedSamples[key] = append(appendedSamples[key], s)
totalSamples++
}
}
@ -5222,7 +5262,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
expSamples[k] = append(expSamples[k], s)
}
}
require.Equal(t, expSamples, seriesSet)
requireEqualSeries(t, expSamples, seriesSet, true)
requireEqualOOOSamples(t, totalSamples-2, db)
}
@ -5284,6 +5324,14 @@ func TestOOOAppendAndQuery(t *testing.T) {
}
func TestOOODisabled(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOODisabled(t, scenario)
})
}
}
func testOOODisabled(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 0
db := openTestDB(t, opts, nil)
@ -5297,19 +5345,19 @@ func TestOOODisabled(t *testing.T) {
expSamples := make(map[string][]chunks.Sample)
totalSamples := 0
failedSamples := 0
addSample := func(lbls labels.Labels, fromMins, toMins int64, faceError bool) {
addSample := func(db *DB, lbls labels.Labels, fromMins, toMins int64, faceError bool) {
app := db.Appender(context.Background())
key := lbls.String()
from, to := minutes(fromMins), minutes(toMins)
for min := from; min <= to; min += time.Minute.Milliseconds() {
val := rand.Float64()
_, err := app.Append(0, lbls, min, val)
_, _, err := scenario.appendFunc(app, lbls, min, min)
if faceError {
require.Error(t, err)
failedSamples++
} else {
require.NoError(t, err)
expSamples[key] = append(expSamples[key], sample{t: min, f: val})
expSamples[key] = append(expSamples[key], scenario.sampleFunc(min, min))
totalSamples++
}
}
@ -5320,21 +5368,21 @@ func TestOOODisabled(t *testing.T) {
}
}
addSample(s1, 300, 300, false) // In-order samples.
addSample(s1, 250, 260, true) // Some ooo samples.
addSample(s1, 59, 59, true) // Out of time window.
addSample(s1, 60, 65, true) // At the edge of time window, also it would be "out of bound" without the ooo support.
addSample(s1, 59, 59, true) // Out of time window again.
addSample(s1, 301, 310, false) // More in-order samples.
addSample(db, s1, 300, 300, false) // In-order samples.
addSample(db, s1, 250, 260, true) // Some ooo samples.
addSample(db, s1, 59, 59, true) // Out of time window.
addSample(db, s1, 60, 65, true) // At the edge of time window, also it would be "out of bound" without the ooo support.
addSample(db, s1, 59, 59, true) // Out of time window again.
addSample(db, s1, 301, 310, false) // More in-order samples.
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar."))
require.Equal(t, expSamples, seriesSet)
requireEqualSeries(t, expSamples, seriesSet, true)
requireEqualOOOSamples(t, 0, db)
require.Equal(t, float64(failedSamples),
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)),
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType)),
"number of ooo/oob samples mismatch")
// Verifying that no OOO artifacts were generated.
@ -5349,6 +5397,14 @@ func TestOOODisabled(t *testing.T) {
}
func TestWBLAndMmapReplay(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testWBLAndMmapReplay(t, scenario)
})
}
}
func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds()
@ -5369,10 +5425,10 @@ func TestWBLAndMmapReplay(t *testing.T) {
key := lbls.String()
from, to := minutes(fromMins), minutes(toMins)
for min := from; min <= to; min += time.Minute.Milliseconds() {
val := rand.Float64()
_, err := app.Append(0, lbls, min, val)
val := rand.Intn(1000)
_, s, err := scenario.appendFunc(app, lbls, min, int64(val))
require.NoError(t, err)
expSamples[key] = append(expSamples[key], sample{t: min, f: val})
expSamples[key] = append(expSamples[key], s)
totalSamples++
}
require.NoError(t, app.Commit())
@ -5390,7 +5446,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
})
exp[k] = v
}
require.Equal(t, exp, seriesSet)
requireEqualSeries(t, exp, seriesSet, true)
}
// In-order samples.
@ -5413,10 +5469,9 @@ func TestWBLAndMmapReplay(t *testing.T) {
chk, err := db.head.chunkDiskMapper.Chunk(mc.ref)
require.NoError(t, err)
it := chk.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
ts, val := it.At()
s1MmapSamples = append(s1MmapSamples, sample{t: ts, f: val})
}
smpls, err := storage.ExpandSamples(it, newSample)
require.NoError(t, err)
s1MmapSamples = append(s1MmapSamples, smpls...)
}
require.NotEmpty(t, s1MmapSamples)
@ -5534,6 +5589,14 @@ func TestWBLAndMmapReplay(t *testing.T) {
}
func TestOOOCompactionFailure(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOCompactionFailure(t, scenario)
})
}
}
func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
ctx := context.Background()
@ -5554,7 +5617,7 @@ func TestOOOCompactionFailure(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, _, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -5642,7 +5705,7 @@ func TestOOOCompactionFailure(t *testing.T) {
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
}
expRes := map[string][]chunks.Sample{
series1.String(): series1Samples,
@ -5650,9 +5713,8 @@ func TestOOOCompactionFailure(t *testing.T) {
q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
// Checking for expected data in the blocks.
@ -5819,6 +5881,14 @@ func TestWBLCorruption(t *testing.T) {
}
func TestOOOMmapCorruption(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOMmapCorruption(t, scenario)
})
}
}
func testOOOMmapCorruption(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
opts := DefaultOptions()
@ -5838,11 +5908,11 @@ func TestOOOMmapCorruption(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
allSamples = append(allSamples, s)
if inMmapAfterCorruption {
expInMmapChunks = append(expInMmapChunks, sample{t: ts, f: float64(ts)})
expInMmapChunks = append(expInMmapChunks, s)
}
}
require.NoError(t, app.Commit())
@ -5880,7 +5950,7 @@ func TestOOOMmapCorruption(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
verifySamples(allSamples)
@ -5942,6 +6012,14 @@ func TestOOOMmapCorruption(t *testing.T) {
}
func TestOutOfOrderRuntimeConfig(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOutOfOrderRuntimeConfig(t, scenario)
})
}
}
func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) {
ctx := context.Background()
getDB := func(oooTimeWindow int64) *DB {
@ -5975,10 +6053,10 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, s, err := scenario.appendFunc(app, series1, ts, ts)
if success {
require.NoError(t, err)
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
allSamples = append(allSamples, s)
} else {
require.Error(t, err)
}
@ -6000,7 +6078,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
doOOOCompaction := func(t *testing.T, db *DB) {
@ -6173,12 +6251,20 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
}
func TestNoGapAfterRestartWithOOO(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testNoGapAfterRestartWithOOO(t, scenario)
})
}
}
func testNoGapAfterRestartWithOOO(t *testing.T, scenario sampleTypeScenario) {
series1 := labels.FromStrings("foo", "bar1")
addSamples := func(t *testing.T, db *DB, fromMins, toMins int64, success bool) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, _, err := scenario.appendFunc(app, series1, ts, ts)
if success {
require.NoError(t, err)
} else {
@ -6192,7 +6278,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
var expSamples []chunks.Sample
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
expSamples = append(expSamples, sample{t: ts, f: float64(ts)})
expSamples = append(expSamples, scenario.sampleFunc(ts, ts))
}
expRes := map[string][]chunks.Sample{
@ -6203,7 +6289,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
cases := []struct {
@ -6280,6 +6366,14 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
}
func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testWblReplayAfterOOODisableAndRestart(t, scenario)
})
}
}
func testWblReplayAfterOOODisableAndRestart(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
opts := DefaultOptions()
@ -6298,9 +6392,9 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
allSamples = append(allSamples, s)
}
require.NoError(t, app.Commit())
}
@ -6323,7 +6417,7 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
require.NoError(t, err)
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
require.Equal(t, expRes, actRes)
requireEqualSeries(t, expRes, actRes, true)
}
verifySamples(allSamples)
@ -6339,6 +6433,14 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
}
func TestPanicOnApplyConfig(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testPanicOnApplyConfig(t, scenario)
})
}
}
func testPanicOnApplyConfig(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
opts := DefaultOptions()
@ -6357,9 +6459,9 @@ func TestPanicOnApplyConfig(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
allSamples = append(allSamples, s)
}
require.NoError(t, app.Commit())
}
@ -6387,6 +6489,14 @@ func TestPanicOnApplyConfig(t *testing.T) {
}
func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testDiskFillingUpAfterDisablingOOO(t, scenario)
})
}
}
func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
ctx := context.Background()
@ -6406,9 +6516,9 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
app := db.Appender(context.Background())
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
_, err := app.Append(0, series1, ts, float64(ts))
_, s, err := scenario.appendFunc(app, series1, ts, ts)
require.NoError(t, err)
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
allSamples = append(allSamples, s)
}
require.NoError(t, app.Commit())
}
@ -7060,12 +7170,6 @@ Outer:
require.NoError(t, writerErr)
}
func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
require.Equal(t, float64(expectedSamples),
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat)),
"number of ooo appended samples mismatch")
}
type mockCompactorFn struct {
planFn func() ([]string, error)
compactFn func() ([]ulid.ULID, error)

View file

@ -2665,6 +2665,14 @@ func TestIsolationWithoutAdd(t *testing.T) {
}
func TestOutOfOrderSamplesMetric(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOutOfOrderSamplesMetric(t, scenario)
})
}
}
func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
@ -2674,33 +2682,38 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
}()
db.DisableCompactions()
appendSample := func(appender storage.Appender, ts int64) (storage.SeriesRef, error) {
ref, _, err := scenario.appendFunc(appender, labels.FromStrings("a", "b"), ts, 99)
return ref, err
}
ctx := context.Background()
app := db.Appender(ctx)
for i := 1; i <= 5; i++ {
_, err = app.Append(0, labels.FromStrings("a", "b"), int64(i), 99)
_, err = appendSample(app, int64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Test out of order metric.
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
app = db.Appender(ctx)
_, err = app.Append(0, labels.FromStrings("a", "b"), 2, 99)
_, err = appendSample(app, 2)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = app.Append(0, labels.FromStrings("a", "b"), 3, 99)
_, err = appendSample(app, 3)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = app.Append(0, labels.FromStrings("a", "b"), 4, 99)
_, err = appendSample(app, 4)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
require.NoError(t, app.Commit())
// Compact Head to test out of bound metric.
app = db.Appender(ctx)
_, err = app.Append(0, labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99)
_, err = appendSample(app, DefaultBlockDuration*2)
require.NoError(t, err)
require.NoError(t, app.Commit())
@ -2709,36 +2722,36 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
require.Greater(t, db.head.minValidTime.Load(), int64(0))
app = db.Appender(ctx)
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99)
_, err = appendSample(app, db.head.minValidTime.Load()-2)
require.Equal(t, storage.ErrOutOfBounds, err)
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType)))
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99)
_, err = appendSample(app, db.head.minValidTime.Load()-1)
require.Equal(t, storage.ErrOutOfBounds, err)
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType)))
require.NoError(t, app.Commit())
// Some more valid samples for out of order.
app = db.Appender(ctx)
for i := 1; i <= 5; i++ {
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99)
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+int64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Test out of order metric.
app = db.Appender(ctx)
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99)
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+2)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99)
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+3)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99)
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+4)
require.Equal(t, storage.ErrOutOfOrderSample, err)
require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
require.NoError(t, app.Commit())
}
@ -4801,6 +4814,14 @@ func TestWBLReplay(t *testing.T) {
// TestOOOMmapReplay checks the replay at a low level.
func TestOOOMmapReplay(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOMmapReplay(t, scenario)
})
}
}
func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
require.NoError(t, err)
@ -4820,8 +4841,7 @@ func TestOOOMmapReplay(t *testing.T) {
l := labels.FromStrings("foo", "bar")
appendSample := func(mins int64) {
app := h.Appender(context.Background())
ts, v := mins*time.Minute.Milliseconds(), float64(mins)
_, err := app.Append(0, l, ts, v)
_, _, err := scenario.appendFunc(app, l, mins*time.Minute.Milliseconds(), mins)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
@ -5096,6 +5116,14 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
}
func TestOOOAppendWithNoSeries(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOAppendWithNoSeries(t, scenario.appendFunc)
})
}
}
func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
require.NoError(t, err)
@ -5116,7 +5144,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
appendSample := func(lbls labels.Labels, ts int64) {
app := h.Appender(context.Background())
_, err := app.Append(0, lbls, ts*time.Minute.Milliseconds(), float64(ts))
_, _, err := appendFunc(app, lbls, ts*time.Minute.Milliseconds(), ts)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
@ -5164,7 +5192,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
// Now 179m is too old.
s4 := newLabels(4)
app := h.Appender(context.Background())
_, err = app.Append(0, s4, 179*time.Minute.Milliseconds(), float64(179))
_, _, err = appendFunc(app, s4, 179*time.Minute.Milliseconds(), 179)
require.Equal(t, storage.ErrTooOldSample, err)
require.NoError(t, app.Rollback())
verifyOOOSamples(s3, 1)
@ -5177,6 +5205,14 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
}
func TestHeadMinOOOTimeUpdate(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testHeadMinOOOTimeUpdate(t, scenario)
})
}
}
func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
require.NoError(t, err)
@ -5195,15 +5231,13 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) {
require.NoError(t, h.Init(0))
appendSample := func(ts int64) {
lbls := labels.FromStrings("foo", "bar")
app := h.Appender(context.Background())
_, err := app.Append(0, lbls, ts*time.Minute.Milliseconds(), float64(ts))
_, _, err = scenario.appendFunc(app, labels.FromStrings("a", "b"), ts*time.Minute.Milliseconds(), ts)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
appendSample(300) // In-order sample.
require.Equal(t, int64(math.MaxInt64), h.MinOOOTime())
appendSample(295) // OOO sample.

View file

@ -359,6 +359,15 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
}
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_LabelValues(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
chunkRange := int64(2000)
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
t.Cleanup(func() { require.NoError(t, head.Close()) })
@ -368,15 +377,15 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
app := head.Appender(context.Background())
// Add in-order samples
_, err := app.Append(0, labels.FromStrings("foo", "bar1"), 100, 1)
_, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1))
require.NoError(t, err)
_, err = app.Append(0, labels.FromStrings("foo", "bar2"), 100, 2)
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2))
require.NoError(t, err)
// Add ooo samples for those series
_, err = app.Append(0, labels.FromStrings("foo", "bar1"), 90, 1)
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1))
require.NoError(t, err)
_, err = app.Append(0, labels.FromStrings("foo", "bar2"), 90, 2)
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2))
require.NoError(t, err)
require.NoError(t, app.Commit())
@ -453,6 +462,15 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
// It does so by appending out of order samples to the db and then initializing
// an OOOHeadChunkReader to read chunks from it.
func TestOOOHeadChunkReader_Chunk(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_Chunk(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
@ -460,12 +478,6 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
appendSample := func(app storage.Appender, l labels.Labels, timestamp int64, value float64) storage.SeriesRef {
ref, err := app.Append(0, l, timestamp, value)
require.NoError(t, err)
return ref
}
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
db := newTestDBWithOpts(t, opts)
@ -484,7 +496,7 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT int64
queryMaxT int64
firstInOrderSampleAt int64
inputSamples chunks.SampleSlice
inputSamples []testValue
expChunkError bool
expChunksSamples []chunks.SampleSlice
}{
@ -493,9 +505,9 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: chunks.SampleSlice{
sample{t: minutes(30), f: float64(0)},
sample{t: minutes(40), f: float64(0)},
inputSamples: []testValue{
{Ts: minutes(30), V: 0},
{Ts: minutes(40), V: 0},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -504,8 +516,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// Output Graphically [--------] (With 2 samples)
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(30), f: float64(0)},
sample{t: minutes(40), f: float64(0)},
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(40), 0),
},
},
},
@ -514,19 +526,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: chunks.SampleSlice{
// opts.OOOCapMax is 5 so these will be mmapped to the first mmapped chunk
sample{t: minutes(41), f: float64(0)},
sample{t: minutes(42), f: float64(0)},
sample{t: minutes(43), f: float64(0)},
sample{t: minutes(44), f: float64(0)},
sample{t: minutes(45), f: float64(0)},
// The following samples will go to the head chunk, and we want it
// to overlap with the previous chunk
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(50), f: float64(1)},
},
expChunkError: false,
inputSamples: []testValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [---] (With 5 samples)
@ -534,13 +535,13 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// Output Graphically [-----------------] (With 7 samples)
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(41), f: float64(0)},
sample{t: minutes(42), f: float64(0)},
sample{t: minutes(43), f: float64(0)},
sample{t: minutes(44), f: float64(0)},
sample{t: minutes(45), f: float64(0)},
sample{t: minutes(50), f: float64(1)},
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(41), 0),
scenario.sampleFunc(minutes(42), 0),
scenario.sampleFunc(minutes(43), 0),
scenario.sampleFunc(minutes(44), 0),
scenario.sampleFunc(minutes(45), 0),
scenario.sampleFunc(minutes(50), 1),
},
},
},
@ -549,28 +550,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: chunks.SampleSlice{
inputSamples: []testValue{
// Chunk 0
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(12), f: float64(0)},
sample{t: minutes(14), f: float64(0)},
sample{t: minutes(16), f: float64(0)},
sample{t: minutes(20), f: float64(0)},
{Ts: minutes(10), V: 0},
{Ts: minutes(12), V: 0},
{Ts: minutes(14), V: 0},
{Ts: minutes(16), V: 0},
{Ts: minutes(20), V: 0},
// Chunk 1
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(22), f: float64(1)},
sample{t: minutes(24), f: float64(1)},
sample{t: minutes(26), f: float64(1)},
sample{t: minutes(29), f: float64(1)},
// Chunk 2
sample{t: minutes(30), f: float64(2)},
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(34), f: float64(2)},
sample{t: minutes(36), f: float64(2)},
sample{t: minutes(40), f: float64(2)},
{Ts: minutes(20), V: 1},
{Ts: minutes(22), V: 1},
{Ts: minutes(24), V: 1},
{Ts: minutes(26), V: 1},
{Ts: minutes(29), V: 1},
// Chunk 3
{Ts: minutes(30), V: 2},
{Ts: minutes(32), V: 2},
{Ts: minutes(34), V: 2},
{Ts: minutes(36), V: 2},
{Ts: minutes(40), V: 2},
// Head
sample{t: minutes(40), f: float64(3)},
sample{t: minutes(50), f: float64(3)},
{Ts: minutes(40), V: 3},
{Ts: minutes(50), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -582,23 +583,23 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// Output Graphically [----------------][-----------------]
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(12), f: float64(0)},
sample{t: minutes(14), f: float64(0)},
sample{t: minutes(16), f: float64(0)},
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(22), f: float64(1)},
sample{t: minutes(24), f: float64(1)},
sample{t: minutes(26), f: float64(1)},
sample{t: minutes(29), f: float64(1)},
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(12), 0),
scenario.sampleFunc(minutes(14), 0),
scenario.sampleFunc(minutes(16), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(22), 1),
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(29), 1),
},
{
sample{t: minutes(30), f: float64(2)},
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(34), f: float64(2)},
sample{t: minutes(36), f: float64(2)},
sample{t: minutes(40), f: float64(3)},
sample{t: minutes(50), f: float64(3)},
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
scenario.sampleFunc(minutes(36), 2),
scenario.sampleFunc(minutes(40), 3),
scenario.sampleFunc(minutes(50), 3),
},
},
},
@ -607,28 +608,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: chunks.SampleSlice{
inputSamples: []testValue{
// Chunk 0
sample{t: minutes(40), f: float64(0)},
sample{t: minutes(42), f: float64(0)},
sample{t: minutes(44), f: float64(0)},
sample{t: minutes(46), f: float64(0)},
sample{t: minutes(50), f: float64(0)},
{Ts: minutes(40), V: 0},
{Ts: minutes(42), V: 0},
{Ts: minutes(44), V: 0},
{Ts: minutes(46), V: 0},
{Ts: minutes(50), V: 0},
// Chunk 1
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(32), f: float64(1)},
sample{t: minutes(34), f: float64(1)},
sample{t: minutes(36), f: float64(1)},
sample{t: minutes(40), f: float64(1)},
// Chunk 2
sample{t: minutes(20), f: float64(2)},
sample{t: minutes(22), f: float64(2)},
sample{t: minutes(24), f: float64(2)},
sample{t: minutes(26), f: float64(2)},
sample{t: minutes(29), f: float64(2)},
{Ts: minutes(30), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(34), V: 1},
{Ts: minutes(36), V: 1},
{Ts: minutes(40), V: 1},
// Chunk 3
{Ts: minutes(20), V: 2},
{Ts: minutes(22), V: 2},
{Ts: minutes(24), V: 2},
{Ts: minutes(26), V: 2},
{Ts: minutes(29), V: 2},
// Head
sample{t: minutes(10), f: float64(3)},
sample{t: minutes(20), f: float64(3)},
{Ts: minutes(10), V: 3},
{Ts: minutes(20), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -640,23 +641,23 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// Output Graphically [----------------][-----------------]
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(10), f: float64(3)},
sample{t: minutes(20), f: float64(2)},
sample{t: minutes(22), f: float64(2)},
sample{t: minutes(24), f: float64(2)},
sample{t: minutes(26), f: float64(2)},
sample{t: minutes(29), f: float64(2)},
scenario.sampleFunc(minutes(10), 3),
scenario.sampleFunc(minutes(20), 2),
scenario.sampleFunc(minutes(22), 2),
scenario.sampleFunc(minutes(24), 2),
scenario.sampleFunc(minutes(26), 2),
scenario.sampleFunc(minutes(29), 2),
},
{
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(32), f: float64(1)},
sample{t: minutes(34), f: float64(1)},
sample{t: minutes(36), f: float64(1)},
sample{t: minutes(40), f: float64(0)},
sample{t: minutes(42), f: float64(0)},
sample{t: minutes(44), f: float64(0)},
sample{t: minutes(46), f: float64(0)},
sample{t: minutes(50), f: float64(0)},
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 1),
scenario.sampleFunc(minutes(34), 1),
scenario.sampleFunc(minutes(36), 1),
scenario.sampleFunc(minutes(40), 0),
scenario.sampleFunc(minutes(42), 0),
scenario.sampleFunc(minutes(44), 0),
scenario.sampleFunc(minutes(46), 0),
scenario.sampleFunc(minutes(50), 0),
},
},
},
@ -665,28 +666,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: chunks.SampleSlice{
inputSamples: []testValue{
// Chunk 0
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(12), f: float64(0)},
sample{t: minutes(14), f: float64(0)},
sample{t: minutes(16), f: float64(0)},
sample{t: minutes(18), f: float64(0)},
{Ts: minutes(10), V: 0},
{Ts: minutes(12), V: 0},
{Ts: minutes(14), V: 0},
{Ts: minutes(16), V: 0},
{Ts: minutes(18), V: 0},
// Chunk 1
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(22), f: float64(1)},
sample{t: minutes(24), f: float64(1)},
sample{t: minutes(26), f: float64(1)},
sample{t: minutes(28), f: float64(1)},
// Chunk 2
sample{t: minutes(30), f: float64(2)},
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(34), f: float64(2)},
sample{t: minutes(36), f: float64(2)},
sample{t: minutes(38), f: float64(2)},
{Ts: minutes(20), V: 1},
{Ts: minutes(22), V: 1},
{Ts: minutes(24), V: 1},
{Ts: minutes(26), V: 1},
{Ts: minutes(28), V: 1},
// Chunk 3
{Ts: minutes(30), V: 2},
{Ts: minutes(32), V: 2},
{Ts: minutes(34), V: 2},
{Ts: minutes(36), V: 2},
{Ts: minutes(38), V: 2},
// Head
sample{t: minutes(40), f: float64(3)},
sample{t: minutes(42), f: float64(3)},
{Ts: minutes(40), V: 3},
{Ts: minutes(42), V: 3},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -698,29 +699,29 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// Output Graphically [-------][-------][-------][--------]
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(12), f: float64(0)},
sample{t: minutes(14), f: float64(0)},
sample{t: minutes(16), f: float64(0)},
sample{t: minutes(18), f: float64(0)},
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(12), 0),
scenario.sampleFunc(minutes(14), 0),
scenario.sampleFunc(minutes(16), 0),
scenario.sampleFunc(minutes(18), 0),
},
{
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(22), f: float64(1)},
sample{t: minutes(24), f: float64(1)},
sample{t: minutes(26), f: float64(1)},
sample{t: minutes(28), f: float64(1)},
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(22), 1),
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(28), 1),
},
{
sample{t: minutes(30), f: float64(2)},
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(34), f: float64(2)},
sample{t: minutes(36), f: float64(2)},
sample{t: minutes(38), f: float64(2)},
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
scenario.sampleFunc(minutes(36), 2),
scenario.sampleFunc(minutes(38), 2),
},
{
sample{t: minutes(40), f: float64(3)},
sample{t: minutes(42), f: float64(3)},
scenario.sampleFunc(minutes(40), 3),
scenario.sampleFunc(minutes(42), 3),
},
},
},
@ -729,22 +730,22 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
inputSamples: chunks.SampleSlice{
inputSamples: []testValue{
// Chunk 0
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(15), f: float64(0)},
sample{t: minutes(20), f: float64(0)},
sample{t: minutes(25), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
{Ts: minutes(10), V: 0},
{Ts: minutes(15), V: 0},
{Ts: minutes(20), V: 0},
{Ts: minutes(25), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(35), f: float64(1)},
sample{t: minutes(42), f: float64(1)},
{Ts: minutes(20), V: 1},
{Ts: minutes(25), V: 1},
{Ts: minutes(30), V: 1},
{Ts: minutes(35), V: 1},
{Ts: minutes(42), V: 1},
// Chunk 2 Head
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(50), f: float64(2)},
{Ts: minutes(32), V: 2},
{Ts: minutes(50), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -755,15 +756,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// Output Graphically [-----------------------------------]
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(15), f: float64(0)},
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(35), f: float64(1)},
sample{t: minutes(42), f: float64(1)},
sample{t: minutes(50), f: float64(2)},
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(15), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(35), 1),
scenario.sampleFunc(minutes(42), 1),
scenario.sampleFunc(minutes(50), 2),
},
},
},
@ -772,22 +773,22 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
queryMinT: minutes(12),
queryMaxT: minutes(33),
firstInOrderSampleAt: minutes(120),
inputSamples: chunks.SampleSlice{
inputSamples: []testValue{
// Chunk 0
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(15), f: float64(0)},
sample{t: minutes(20), f: float64(0)},
sample{t: minutes(25), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
{Ts: minutes(10), V: 0},
{Ts: minutes(15), V: 0},
{Ts: minutes(20), V: 0},
{Ts: minutes(25), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(35), f: float64(1)},
sample{t: minutes(42), f: float64(1)},
{Ts: minutes(20), V: 1},
{Ts: minutes(25), V: 1},
{Ts: minutes(30), V: 1},
{Ts: minutes(35), V: 1},
{Ts: minutes(42), V: 1},
// Chunk 2 Head
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(50), f: float64(2)},
{Ts: minutes(32), V: 2},
{Ts: minutes(50), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -798,15 +799,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// Output Graphically [-----------------------------------]
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(10), f: float64(0)},
sample{t: minutes(15), f: float64(0)},
sample{t: minutes(20), f: float64(1)},
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(30), f: float64(1)},
sample{t: minutes(32), f: float64(2)},
sample{t: minutes(35), f: float64(1)},
sample{t: minutes(42), f: float64(1)},
sample{t: minutes(50), f: float64(2)},
scenario.sampleFunc(minutes(10), 0),
scenario.sampleFunc(minutes(15), 0),
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(35), 1),
scenario.sampleFunc(minutes(42), 1),
scenario.sampleFunc(minutes(50), 2),
},
},
},
@ -817,13 +818,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
db := newTestDBWithOpts(t, opts)
app := db.Appender(context.Background())
s1Ref := appendSample(app, s1, tc.firstInOrderSampleAt, float64(tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()))
s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
require.NoError(t, err)
require.NoError(t, app.Commit())
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.inputSamples {
appendSample(app, s1, s.T(), s.F())
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -832,7 +835,7 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err := ir.Series(s1Ref, &b, &chks)
err = ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
@ -843,13 +846,10 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
require.NoError(t, err)
require.Nil(t, c)
var resultSamples chunks.SampleSlice
it := iterable.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
resultSamples = append(resultSamples, sample{t: t, f: v})
}
require.Equal(t, tc.expChunksSamples[i], resultSamples)
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
}
})
}
@ -864,6 +864,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
// - Response B comes from : Series(), in parallel new samples added to the head, then Chunk()
// - A == B
func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario)
})
}
}
//nolint:revive // unexported-return.
func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 5
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
@ -871,19 +880,13 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
appendSample := func(app storage.Appender, l labels.Labels, timestamp int64, value float64) storage.SeriesRef {
ref, err := app.Append(0, l, timestamp, value)
require.NoError(t, err)
return ref
}
tests := []struct {
name string
queryMinT int64
queryMaxT int64
firstInOrderSampleAt int64
initialSamples chunks.SampleSlice
samplesAfterSeriesCall chunks.SampleSlice
initialSamples []testValue
samplesAfterSeriesCall []testValue
expChunkError bool
expChunksSamples []chunks.SampleSlice
}{
@ -892,21 +895,21 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
initialSamples: chunks.SampleSlice{
initialSamples: []testValue{
// Chunk 0
sample{t: minutes(20), f: float64(0)},
sample{t: minutes(22), f: float64(0)},
sample{t: minutes(24), f: float64(0)},
sample{t: minutes(26), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
{Ts: minutes(20), V: 0},
{Ts: minutes(22), V: 0},
{Ts: minutes(24), V: 0},
{Ts: minutes(26), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1 Head
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(35), f: float64(1)},
{Ts: minutes(25), V: 1},
{Ts: minutes(35), V: 1},
},
samplesAfterSeriesCall: chunks.SampleSlice{
sample{t: minutes(10), f: float64(1)},
sample{t: minutes(32), f: float64(1)},
sample{t: minutes(50), f: float64(1)},
samplesAfterSeriesCall: []testValue{
{Ts: minutes(10), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(50), V: 1},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -918,39 +921,39 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
// Output Graphically [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept)
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(20), f: float64(0)},
sample{t: minutes(22), f: float64(0)},
sample{t: minutes(24), f: float64(0)},
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(26), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
sample{t: minutes(35), f: float64(1)},
scenario.sampleFunc(minutes(20), 0),
scenario.sampleFunc(minutes(22), 0),
scenario.sampleFunc(minutes(24), 0),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(26), 0),
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(35), 1),
},
},
},
{
name: "After Series() previous head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in the response.",
name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
initialSamples: chunks.SampleSlice{
initialSamples: []testValue{
// Chunk 0
sample{t: minutes(20), f: float64(0)},
sample{t: minutes(22), f: float64(0)},
sample{t: minutes(24), f: float64(0)},
sample{t: minutes(26), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
{Ts: minutes(20), V: 0},
{Ts: minutes(22), V: 0},
{Ts: minutes(24), V: 0},
{Ts: minutes(26), V: 0},
{Ts: minutes(30), V: 0},
// Chunk 1 Head
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(35), f: float64(1)},
{Ts: minutes(25), V: 1},
{Ts: minutes(35), V: 1},
},
samplesAfterSeriesCall: chunks.SampleSlice{
sample{t: minutes(10), f: float64(1)},
sample{t: minutes(32), f: float64(1)},
sample{t: minutes(50), f: float64(1)},
samplesAfterSeriesCall: []testValue{
{Ts: minutes(10), V: 1},
{Ts: minutes(32), V: 1},
{Ts: minutes(50), V: 1},
// Chunk 1 gets mmapped and Chunk 2, the new head is born
sample{t: minutes(25), f: float64(2)},
sample{t: minutes(31), f: float64(2)},
{Ts: minutes(25), V: 2},
{Ts: minutes(31), V: 2},
},
expChunkError: false,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
@ -963,13 +966,13 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
// Output Graphically [------------] (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1
expChunksSamples: []chunks.SampleSlice{
{
sample{t: minutes(20), f: float64(0)},
sample{t: minutes(22), f: float64(0)},
sample{t: minutes(24), f: float64(0)},
sample{t: minutes(25), f: float64(1)},
sample{t: minutes(26), f: float64(0)},
sample{t: minutes(30), f: float64(0)},
sample{t: minutes(35), f: float64(1)},
scenario.sampleFunc(minutes(20), 0),
scenario.sampleFunc(minutes(22), 0),
scenario.sampleFunc(minutes(24), 0),
scenario.sampleFunc(minutes(25), 1),
scenario.sampleFunc(minutes(26), 0),
scenario.sampleFunc(minutes(30), 0),
scenario.sampleFunc(minutes(35), 1),
},
},
},
@ -980,13 +983,15 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
db := newTestDBWithOpts(t, opts)
app := db.Appender(context.Background())
s1Ref := appendSample(app, s1, tc.firstInOrderSampleAt, float64(tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()))
s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
require.NoError(t, err)
require.NoError(t, app.Commit())
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.initialSamples {
appendSample(app, s1, s.T(), s.F())
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -995,7 +1000,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
var chks []chunks.Meta
var b labels.ScratchBuilder
err := ir.Series(s1Ref, &b, &chks)
err = ir.Series(s1Ref, &b, &chks)
require.NoError(t, err)
require.Equal(t, len(tc.expChunksSamples), len(chks))
@ -1003,7 +1008,8 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
// OOO few samples for s1.
app = db.Appender(context.Background())
for _, s := range tc.samplesAfterSeriesCall {
appendSample(app, s1, s.T(), s.F())
_, _, err = scenario.appendFunc(app, s1, s.Ts, s.V)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
@ -1014,13 +1020,10 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
require.NoError(t, err)
require.Nil(t, c)
var resultSamples chunks.SampleSlice
it := iterable.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
ts, v := it.At()
resultSamples = append(resultSamples, sample{t: ts, f: v})
}
require.Equal(t, tc.expChunksSamples[i], resultSamples)
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
}
})
}

176
tsdb/testutil.go Normal file
View file

@ -0,0 +1,176 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"testing"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
)
const (
float = "float"
)
type testValue struct {
Ts int64
V int64
CounterResetHeader histogram.CounterResetHint
}
type sampleTypeScenario struct {
sampleType string
appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)
sampleFunc func(ts, value int64) sample
}
// TODO: native histogram sample types will be added as part of out-of-order native histogram support; see #11220.
var sampleTypeScenarios = map[string]sampleTypeScenario{
float: {
sampleType: sampleMetricTypeFloat,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, f: float64(value)}
ref, err := appender.Append(0, lbls, ts, s.f)
return ref, s, err
},
sampleFunc: func(ts, value int64) sample {
return sample{t: ts, f: float64(value)}
},
},
// intHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(value))}
// },
// },
// floatHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(int(value))}
// },
// },
// gaugeIntHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(int(value))}
// },
// },
// gaugeFloatHistogram: {
// sampleType: sampleMetricTypeHistogram,
// appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
// s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
// ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
// return ref, s, err
// },
// sampleFunc: func(ts, value int64) sample {
// return sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(int(value))}
// },
// },
}
// requireEqualSeries checks that the actual series are equal to the expected ones. It ignores the counter reset hints for histograms.
func requireEqualSeries(t *testing.T, expected, actual map[string][]chunks.Sample, ignoreCounterResets bool) {
for name, expectedItem := range expected {
actualItem, ok := actual[name]
require.True(t, ok, "Expected series %s not found", name)
requireEqualSamples(t, name, expectedItem, actualItem, ignoreCounterResets)
}
for name := range actual {
_, ok := expected[name]
require.True(t, ok, "Unexpected series %s", name)
}
}
func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
require.Equal(t, float64(expectedSamples),
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat))+
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram)),
"number of ooo appended samples mismatch")
}
func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) {
require.Equal(t, len(expected), len(actual), "Length not equal to expected for %s", name)
for i, s := range expected {
expectedSample := s
actualSample := actual[i]
require.Equal(t, expectedSample.T(), actualSample.T(), "Different timestamps for %s[%d]", name, i)
require.Equal(t, expectedSample.Type().String(), actualSample.Type().String(), "Different types for %s[%d] at ts %d", name, i, expectedSample.T())
switch {
case s.H() != nil:
{
expectedHist := expectedSample.H()
actualHist := actualSample.H()
if ignoreCounterResets && expectedHist.CounterResetHint != histogram.GaugeType {
expectedHist.CounterResetHint = histogram.UnknownCounterReset
actualHist.CounterResetHint = histogram.UnknownCounterReset
} else {
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
}
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
}
case s.FH() != nil:
{
expectedHist := expectedSample.FH()
actualHist := actualSample.FH()
if ignoreCounterResets {
expectedHist.CounterResetHint = histogram.UnknownCounterReset
actualHist.CounterResetHint = histogram.UnknownCounterReset
} else {
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
}
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
}
default:
expectedFloat := expectedSample.F()
actualFloat := actualSample.F()
require.Equal(t, expectedFloat, actualFloat, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
}
}
}
func counterResetAsString(h histogram.CounterResetHint) string {
switch h {
case histogram.UnknownCounterReset:
return "UnknownCounterReset"
case histogram.CounterReset:
return "CounterReset"
case histogram.NotCounterReset:
return "NotCounterReset"
case histogram.GaugeType:
return "GaugeType"
}
panic("Unexpected counter reset type")
}