mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Update tsdb tests to use test utils.
Co-authored-by: Fiona Liao <fiona.liao@grafana.com> Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
This commit is contained in:
parent
60917f628b
commit
45a32a29ef
250
tsdb/db_test.go
250
tsdb/db_test.go
|
@ -4495,6 +4495,14 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
|
|||
//
|
||||
// are not included in this compaction.
|
||||
func TestOOOCompaction(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOCompaction(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -4516,9 +4524,9 @@ func TestOOOCompaction(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, _, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, series2, ts, float64(2*ts))
|
||||
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -4551,8 +4559,8 @@ func TestOOOCompaction(t *testing.T) {
|
|||
fromMins, toMins := r[0], r[1]
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
|
||||
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
|
||||
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
|
||||
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
|
||||
}
|
||||
}
|
||||
expRes := map[string][]chunks.Sample{
|
||||
|
@ -4564,7 +4572,7 @@ func TestOOOCompaction(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
verifyDBSamples() // Before any compaction.
|
||||
|
@ -4619,8 +4627,8 @@ func TestOOOCompaction(t *testing.T) {
|
|||
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
|
||||
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
|
||||
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
|
||||
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
|
||||
}
|
||||
expRes := map[string][]chunks.Sample{
|
||||
series1.String(): series1Samples,
|
||||
|
@ -4631,7 +4639,7 @@ func TestOOOCompaction(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
// Checking for expected data in the blocks.
|
||||
|
@ -4675,6 +4683,14 @@ func TestOOOCompaction(t *testing.T) {
|
|||
// TestOOOCompactionWithNormalCompaction tests if OOO compaction is performed
|
||||
// when the normal head's compaction is done.
|
||||
func TestOOOCompactionWithNormalCompaction(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOCompactionWithNormalCompaction(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -4696,9 +4712,9 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, _, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, series2, ts, float64(2*ts))
|
||||
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -4751,8 +4767,8 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
|
|||
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
|
||||
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
|
||||
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
|
||||
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
|
||||
}
|
||||
expRes := map[string][]chunks.Sample{
|
||||
series1.String(): series1Samples,
|
||||
|
@ -4763,7 +4779,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
// Checking for expected data in the blocks.
|
||||
|
@ -4775,6 +4791,14 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
|
|||
// configured to not have wal and wbl but its able to compact both the in-order
|
||||
// and out-of-order head.
|
||||
func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOCompactionWithDisabledWriteLog(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -4797,9 +4821,9 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, _, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, series2, ts, float64(2*ts))
|
||||
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -4852,8 +4876,8 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
|
|||
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
|
||||
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
|
||||
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
|
||||
series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts))
|
||||
}
|
||||
expRes := map[string][]chunks.Sample{
|
||||
series1.String(): series1Samples,
|
||||
|
@ -4864,7 +4888,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
// Checking for expected data in the blocks.
|
||||
|
@ -4876,6 +4900,14 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) {
|
|||
// missing after a restart while snapshot was enabled, but the query still returns the right
|
||||
// data from the mmap chunks.
|
||||
func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -4898,9 +4930,9 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, _, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, series2, ts, float64(2*ts))
|
||||
_, _, err = scenario.appendFunc(app, series2, ts, 2*ts)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -4946,8 +4978,8 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
|
|||
series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
|
||||
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
|
||||
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
|
||||
series2Samples = append(series2Samples, scenario.sampleFunc(ts, ts*2))
|
||||
}
|
||||
expRes := map[string][]chunks.Sample{
|
||||
series1.String(): series1Samples,
|
||||
|
@ -4958,7 +4990,7 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
// Checking for expected ooo data from mmap chunks.
|
||||
|
@ -5159,6 +5191,14 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOOOAppendAndQuery(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOAppendAndQuery(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOAppendAndQuery(t *testing.T, scenario sampleTypeScenario) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds()
|
||||
|
@ -5180,13 +5220,13 @@ func TestOOOAppendAndQuery(t *testing.T) {
|
|||
key := lbls.String()
|
||||
from, to := minutes(fromMins), minutes(toMins)
|
||||
for min := from; min <= to; min += time.Minute.Milliseconds() {
|
||||
val := rand.Float64()
|
||||
_, err := app.Append(0, lbls, min, val)
|
||||
val := rand.Intn(1000)
|
||||
_, s, err := scenario.appendFunc(app, lbls, min, int64(val))
|
||||
if faceError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
appendedSamples[key] = append(appendedSamples[key], sample{t: min, f: val})
|
||||
appendedSamples[key] = append(appendedSamples[key], s)
|
||||
totalSamples++
|
||||
}
|
||||
}
|
||||
|
@ -5222,7 +5262,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
|
|||
expSamples[k] = append(expSamples[k], s)
|
||||
}
|
||||
}
|
||||
require.Equal(t, expSamples, seriesSet)
|
||||
requireEqualSamples(t, expSamples, seriesSet, true)
|
||||
requireEqualOOOSamples(t, totalSamples-2, db)
|
||||
}
|
||||
|
||||
|
@ -5284,6 +5324,14 @@ func TestOOOAppendAndQuery(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOOODisabled(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOODisabled(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOODisabled(t *testing.T, scenario sampleTypeScenario) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderTimeWindow = 0
|
||||
db := openTestDB(t, opts, nil)
|
||||
|
@ -5297,19 +5345,19 @@ func TestOOODisabled(t *testing.T) {
|
|||
expSamples := make(map[string][]chunks.Sample)
|
||||
totalSamples := 0
|
||||
failedSamples := 0
|
||||
addSample := func(lbls labels.Labels, fromMins, toMins int64, faceError bool) {
|
||||
|
||||
addSample := func(db *DB, lbls labels.Labels, fromMins, toMins int64, faceError bool) {
|
||||
app := db.Appender(context.Background())
|
||||
key := lbls.String()
|
||||
from, to := minutes(fromMins), minutes(toMins)
|
||||
for min := from; min <= to; min += time.Minute.Milliseconds() {
|
||||
val := rand.Float64()
|
||||
_, err := app.Append(0, lbls, min, val)
|
||||
_, _, err := scenario.appendFunc(app, lbls, min, min)
|
||||
if faceError {
|
||||
require.Error(t, err)
|
||||
failedSamples++
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
expSamples[key] = append(expSamples[key], sample{t: min, f: val})
|
||||
expSamples[key] = append(expSamples[key], scenario.sampleFunc(min, min))
|
||||
totalSamples++
|
||||
}
|
||||
}
|
||||
|
@ -5320,21 +5368,21 @@ func TestOOODisabled(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
addSample(s1, 300, 300, false) // In-order samples.
|
||||
addSample(s1, 250, 260, true) // Some ooo samples.
|
||||
addSample(s1, 59, 59, true) // Out of time window.
|
||||
addSample(s1, 60, 65, true) // At the edge of time window, also it would be "out of bound" without the ooo support.
|
||||
addSample(s1, 59, 59, true) // Out of time window again.
|
||||
addSample(s1, 301, 310, false) // More in-order samples.
|
||||
addSample(db, s1, 300, 300, false) // In-order samples.
|
||||
addSample(db, s1, 250, 260, true) // Some ooo samples.
|
||||
addSample(db, s1, 59, 59, true) // Out of time window.
|
||||
addSample(db, s1, 60, 65, true) // At the edge of time window, also it would be "out of bound" without the ooo support.
|
||||
addSample(db, s1, 59, 59, true) // Out of time window again.
|
||||
addSample(db, s1, 301, 310, false) // More in-order samples.
|
||||
|
||||
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar."))
|
||||
require.Equal(t, expSamples, seriesSet)
|
||||
requireEqualSamples(t, expSamples, seriesSet, true)
|
||||
requireEqualOOOSamples(t, 0, db)
|
||||
require.Equal(t, float64(failedSamples),
|
||||
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)),
|
||||
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType)),
|
||||
"number of ooo/oob samples mismatch")
|
||||
|
||||
// Verifying that no OOO artifacts were generated.
|
||||
|
@ -5349,6 +5397,14 @@ func TestOOODisabled(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWBLAndMmapReplay(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testWBLAndMmapReplay(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds()
|
||||
|
@ -5369,10 +5425,10 @@ func TestWBLAndMmapReplay(t *testing.T) {
|
|||
key := lbls.String()
|
||||
from, to := minutes(fromMins), minutes(toMins)
|
||||
for min := from; min <= to; min += time.Minute.Milliseconds() {
|
||||
val := rand.Float64()
|
||||
_, err := app.Append(0, lbls, min, val)
|
||||
val := rand.Intn(1000)
|
||||
_, s, err := scenario.appendFunc(app, lbls, min, int64(val))
|
||||
require.NoError(t, err)
|
||||
expSamples[key] = append(expSamples[key], sample{t: min, f: val})
|
||||
expSamples[key] = append(expSamples[key], s)
|
||||
totalSamples++
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -5390,7 +5446,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
|
|||
})
|
||||
exp[k] = v
|
||||
}
|
||||
require.Equal(t, exp, seriesSet)
|
||||
requireEqualSamples(t, exp, seriesSet, true)
|
||||
}
|
||||
|
||||
// In-order samples.
|
||||
|
@ -5413,10 +5469,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
|
|||
chk, err := db.head.chunkDiskMapper.Chunk(mc.ref)
|
||||
require.NoError(t, err)
|
||||
it := chk.Iterator(nil)
|
||||
for it.Next() == chunkenc.ValFloat {
|
||||
ts, val := it.At()
|
||||
s1MmapSamples = append(s1MmapSamples, sample{t: ts, f: val})
|
||||
}
|
||||
s1MmapSamples = append(s1MmapSamples, samplesFromIterator(t, it)...)
|
||||
}
|
||||
require.NotEmpty(t, s1MmapSamples)
|
||||
|
||||
|
@ -5534,6 +5587,14 @@ func TestWBLAndMmapReplay(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOOOCompactionFailure(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOCompactionFailure(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -5554,7 +5615,7 @@ func TestOOOCompactionFailure(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, _, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -5642,7 +5703,7 @@ func TestOOOCompactionFailure(t *testing.T) {
|
|||
series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1)
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
|
||||
series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts))
|
||||
}
|
||||
expRes := map[string][]chunks.Sample{
|
||||
series1.String(): series1Samples,
|
||||
|
@ -5650,9 +5711,8 @@ func TestOOOCompactionFailure(t *testing.T) {
|
|||
|
||||
q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
// Checking for expected data in the blocks.
|
||||
|
@ -5819,6 +5879,14 @@ func TestWBLCorruption(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOOOMmapCorruption(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOMmapCorruption(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOMmapCorruption(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
|
||||
opts := DefaultOptions()
|
||||
|
@ -5838,11 +5906,11 @@ func TestOOOMmapCorruption(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, s, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
|
||||
allSamples = append(allSamples, s)
|
||||
if inMmapAfterCorruption {
|
||||
expInMmapChunks = append(expInMmapChunks, sample{t: ts, f: float64(ts)})
|
||||
expInMmapChunks = append(expInMmapChunks, s)
|
||||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -5880,7 +5948,7 @@ func TestOOOMmapCorruption(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
verifySamples(allSamples)
|
||||
|
@ -5942,6 +6010,14 @@ func TestOOOMmapCorruption(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOutOfOrderRuntimeConfig(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOutOfOrderRuntimeConfig(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) {
|
||||
ctx := context.Background()
|
||||
|
||||
getDB := func(oooTimeWindow int64) *DB {
|
||||
|
@ -5975,10 +6051,10 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, s, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
if success {
|
||||
require.NoError(t, err)
|
||||
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
|
||||
allSamples = append(allSamples, s)
|
||||
} else {
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
@ -6000,7 +6076,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
doOOOCompaction := func(t *testing.T, db *DB) {
|
||||
|
@ -6173,12 +6249,20 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNoGapAfterRestartWithOOO(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testNoGapAfterRestartWithOOO(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testNoGapAfterRestartWithOOO(t *testing.T, scenario sampleTypeScenario) {
|
||||
series1 := labels.FromStrings("foo", "bar1")
|
||||
addSamples := func(t *testing.T, db *DB, fromMins, toMins int64, success bool) {
|
||||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, _, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
if success {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
|
@ -6192,7 +6276,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
|
|||
var expSamples []chunks.Sample
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
expSamples = append(expSamples, sample{t: ts, f: float64(ts)})
|
||||
expSamples = append(expSamples, scenario.sampleFunc(ts, ts))
|
||||
}
|
||||
|
||||
expRes := map[string][]chunks.Sample{
|
||||
|
@ -6203,7 +6287,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
|
@ -6280,6 +6364,14 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testWblReplayAfterOOODisableAndRestart(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testWblReplayAfterOOODisableAndRestart(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
|
||||
opts := DefaultOptions()
|
||||
|
@ -6298,9 +6390,9 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, s, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
|
||||
allSamples = append(allSamples, s)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -6323,7 +6415,7 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
require.Equal(t, expRes, actRes)
|
||||
requireEqualSamples(t, expRes, actRes, true)
|
||||
}
|
||||
|
||||
verifySamples(allSamples)
|
||||
|
@ -6339,6 +6431,14 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestPanicOnApplyConfig(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testPanicOnApplyConfig(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testPanicOnApplyConfig(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
|
||||
opts := DefaultOptions()
|
||||
|
@ -6357,9 +6457,9 @@ func TestPanicOnApplyConfig(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, s, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
|
||||
allSamples = append(allSamples, s)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -6387,6 +6487,14 @@ func TestPanicOnApplyConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testDiskFillingUpAfterDisablingOOO(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -6406,9 +6514,9 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
|
|||
app := db.Appender(context.Background())
|
||||
for min := fromMins; min <= toMins; min++ {
|
||||
ts := min * time.Minute.Milliseconds()
|
||||
_, err := app.Append(0, series1, ts, float64(ts))
|
||||
_, s, err := scenario.appendFunc(app, series1, ts, ts)
|
||||
require.NoError(t, err)
|
||||
allSamples = append(allSamples, sample{t: ts, f: float64(ts)})
|
||||
allSamples = append(allSamples, s)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -7060,12 +7168,6 @@ Outer:
|
|||
require.NoError(t, writerErr)
|
||||
}
|
||||
|
||||
func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
|
||||
require.Equal(t, float64(expectedSamples),
|
||||
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat)),
|
||||
"number of ooo appended samples mismatch")
|
||||
}
|
||||
|
||||
type mockCompactorFn struct {
|
||||
planFn func() ([]string, error)
|
||||
compactFn func() ([]ulid.ULID, error)
|
||||
|
|
|
@ -2665,6 +2665,14 @@ func TestIsolationWithoutAdd(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOutOfOrderSamplesMetric(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOutOfOrderSamplesMetric(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
|
||||
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
|
||||
|
@ -2674,33 +2682,38 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
}()
|
||||
db.DisableCompactions()
|
||||
|
||||
appendSample := func(appender storage.Appender, ts int64) (storage.SeriesRef, error) {
|
||||
ref, _, err := scenario.appendFunc(appender, labels.FromStrings("a", "b"), ts, 99)
|
||||
return ref, err
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
app := db.Appender(ctx)
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), int64(i), 99)
|
||||
_, err = appendSample(app, int64(i))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Test out of order metric.
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), 2, 99)
|
||||
_, err = appendSample(app, 2)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
|
||||
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), 3, 99)
|
||||
_, err = appendSample(app, 3)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
|
||||
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), 4, 99)
|
||||
_, err = appendSample(app, 4)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Compact Head to test out of bound metric.
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99)
|
||||
_, err = appendSample(app, DefaultBlockDuration*2)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -2709,36 +2722,36 @@ func TestOutOfOrderSamplesMetric(t *testing.T) {
|
|||
require.Greater(t, db.head.minValidTime.Load(), int64(0))
|
||||
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99)
|
||||
_, err = appendSample(app, db.head.minValidTime.Load()-2)
|
||||
require.Equal(t, storage.ErrOutOfBounds, err)
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType)))
|
||||
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99)
|
||||
_, err = appendSample(app, db.head.minValidTime.Load()-1)
|
||||
require.Equal(t, storage.ErrOutOfBounds, err)
|
||||
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType)))
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Some more valid samples for out of order.
|
||||
app = db.Appender(ctx)
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99)
|
||||
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+int64(i))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Test out of order metric.
|
||||
app = db.Appender(ctx)
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99)
|
||||
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+2)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
|
||||
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99)
|
||||
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+3)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
|
||||
|
||||
_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99)
|
||||
_, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+4)
|
||||
require.Equal(t, storage.ErrOutOfOrderSample, err)
|
||||
require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat)))
|
||||
require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType)))
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
|
@ -4801,6 +4814,14 @@ func TestWBLReplay(t *testing.T) {
|
|||
|
||||
// TestOOOMmapReplay checks the replay at a low level.
|
||||
func TestOOOMmapReplay(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOMmapReplay(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||
require.NoError(t, err)
|
||||
|
@ -4820,8 +4841,7 @@ func TestOOOMmapReplay(t *testing.T) {
|
|||
l := labels.FromStrings("foo", "bar")
|
||||
appendSample := func(mins int64) {
|
||||
app := h.Appender(context.Background())
|
||||
ts, v := mins*time.Minute.Milliseconds(), float64(mins)
|
||||
_, err := app.Append(0, l, ts, v)
|
||||
_, _, err := scenario.appendFunc(app, l, mins*time.Minute.Milliseconds(), mins)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -5096,6 +5116,14 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOOOAppendWithNoSeries(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOAppendWithNoSeries(t, scenario.appendFunc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||
require.NoError(t, err)
|
||||
|
@ -5116,7 +5144,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
|
||||
appendSample := func(lbls labels.Labels, ts int64) {
|
||||
app := h.Appender(context.Background())
|
||||
_, err := app.Append(0, lbls, ts*time.Minute.Milliseconds(), float64(ts))
|
||||
_, _, err := appendFunc(app, lbls, ts*time.Minute.Milliseconds(), ts)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
@ -5164,7 +5192,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
// Now 179m is too old.
|
||||
s4 := newLabels(4)
|
||||
app := h.Appender(context.Background())
|
||||
_, err = app.Append(0, s4, 179*time.Minute.Milliseconds(), float64(179))
|
||||
_, _, err = appendFunc(app, s4, 179*time.Minute.Milliseconds(), 179)
|
||||
require.Equal(t, storage.ErrTooOldSample, err)
|
||||
require.NoError(t, app.Rollback())
|
||||
verifyOOOSamples(s3, 1)
|
||||
|
@ -5177,6 +5205,14 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testHeadMinOOOTimeUpdate(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||
require.NoError(t, err)
|
||||
|
@ -5195,15 +5231,13 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) {
|
|||
require.NoError(t, h.Init(0))
|
||||
|
||||
appendSample := func(ts int64) {
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
app := h.Appender(context.Background())
|
||||
_, err := app.Append(0, lbls, ts*time.Minute.Milliseconds(), float64(ts))
|
||||
_, _, err = scenario.appendFunc(app, labels.FromStrings("a", "b"), ts*time.Minute.Milliseconds(), 99.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
}
|
||||
|
||||
appendSample(300) // In-order sample.
|
||||
|
||||
require.Equal(t, int64(math.MaxInt64), h.MinOOOTime())
|
||||
|
||||
appendSample(295) // OOO sample.
|
||||
|
|
|
@ -359,6 +359,15 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOHeadChunkReader_LabelValues(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:revive // unexported-return.
|
||||
func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) {
|
||||
chunkRange := int64(2000)
|
||||
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
|
||||
t.Cleanup(func() { require.NoError(t, head.Close()) })
|
||||
|
@ -368,15 +377,15 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
|
|||
app := head.Appender(context.Background())
|
||||
|
||||
// Add in-order samples
|
||||
_, err := app.Append(0, labels.FromStrings("foo", "bar1"), 100, 1)
|
||||
_, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1))
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar2"), 100, 2)
|
||||
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add ooo samples for those series
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar1"), 90, 1)
|
||||
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1))
|
||||
require.NoError(t, err)
|
||||
_, err = app.Append(0, labels.FromStrings("foo", "bar2"), 90, 2)
|
||||
_, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
@ -453,6 +462,19 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
|
|||
// It does so by appending out of order samples to the db and then initializing
|
||||
// an OOOHeadChunkReader to read chunks from it.
|
||||
func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOHeadChunkReader_Chunk(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected.
|
||||
// It does so by appending out of order samples to the db and then initializing
|
||||
// an OOOHeadChunkReader to read chunks from it.
|
||||
//
|
||||
//nolint:revive // unexported-return.
|
||||
func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 5
|
||||
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
|
||||
|
@ -460,12 +482,6 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
s1 := labels.FromStrings("l", "v1")
|
||||
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||
|
||||
appendSample := func(app storage.Appender, l labels.Labels, timestamp int64, value float64) storage.SeriesRef {
|
||||
ref, err := app.Append(0, l, timestamp, value)
|
||||
require.NoError(t, err)
|
||||
return ref
|
||||
}
|
||||
|
||||
t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) {
|
||||
db := newTestDBWithOpts(t, opts)
|
||||
|
||||
|
@ -484,7 +500,7 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT int64
|
||||
queryMaxT int64
|
||||
firstInOrderSampleAt int64
|
||||
inputSamples chunks.SampleSlice
|
||||
inputSamples []tsValue
|
||||
expChunkError bool
|
||||
expChunksSamples []chunks.SampleSlice
|
||||
}{
|
||||
|
@ -493,9 +509,9 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
inputSamples: chunks.SampleSlice{
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
sample{t: minutes(40), f: float64(0)},
|
||||
inputSamples: []tsValue{
|
||||
{Ts: minutes(30), V: 0},
|
||||
{Ts: minutes(40), V: 0},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -504,8 +520,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// Output Graphically [--------] (With 2 samples)
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
sample{t: minutes(40), f: float64(0)},
|
||||
scenario.sampleFunc(minutes(30), 0),
|
||||
scenario.sampleFunc(minutes(40), 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -514,19 +530,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
inputSamples: chunks.SampleSlice{
|
||||
// opts.OOOCapMax is 5 so these will be mmapped to the first mmapped chunk
|
||||
sample{t: minutes(41), f: float64(0)},
|
||||
sample{t: minutes(42), f: float64(0)},
|
||||
sample{t: minutes(43), f: float64(0)},
|
||||
sample{t: minutes(44), f: float64(0)},
|
||||
sample{t: minutes(45), f: float64(0)},
|
||||
// The following samples will go to the head chunk, and we want it
|
||||
// to overlap with the previous chunk
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(50), f: float64(1)},
|
||||
},
|
||||
expChunkError: false,
|
||||
inputSamples: []tsValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
// Query Interval [------------------------------------------------------------------------------------------]
|
||||
// Chunk 0 [---] (With 5 samples)
|
||||
|
@ -534,13 +539,13 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// Output Graphically [-----------------] (With 7 samples)
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(41), f: float64(0)},
|
||||
sample{t: minutes(42), f: float64(0)},
|
||||
sample{t: minutes(43), f: float64(0)},
|
||||
sample{t: minutes(44), f: float64(0)},
|
||||
sample{t: minutes(45), f: float64(0)},
|
||||
sample{t: minutes(50), f: float64(1)},
|
||||
scenario.sampleFunc(minutes(30), 1),
|
||||
scenario.sampleFunc(minutes(41), 0),
|
||||
scenario.sampleFunc(minutes(42), 0),
|
||||
scenario.sampleFunc(minutes(43), 0),
|
||||
scenario.sampleFunc(minutes(44), 0),
|
||||
scenario.sampleFunc(minutes(45), 0),
|
||||
scenario.sampleFunc(minutes(50), 1),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -549,28 +554,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
inputSamples: chunks.SampleSlice{
|
||||
inputSamples: []tsValue{
|
||||
// Chunk 0
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(12), f: float64(0)},
|
||||
sample{t: minutes(14), f: float64(0)},
|
||||
sample{t: minutes(16), f: float64(0)},
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
{Ts: minutes(10), V: 0},
|
||||
{Ts: minutes(12), V: 0},
|
||||
{Ts: minutes(14), V: 0},
|
||||
{Ts: minutes(16), V: 0},
|
||||
{Ts: minutes(20), V: 0},
|
||||
// Chunk 1
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(22), f: float64(1)},
|
||||
sample{t: minutes(24), f: float64(1)},
|
||||
sample{t: minutes(26), f: float64(1)},
|
||||
sample{t: minutes(29), f: float64(1)},
|
||||
// Chunk 2
|
||||
sample{t: minutes(30), f: float64(2)},
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(34), f: float64(2)},
|
||||
sample{t: minutes(36), f: float64(2)},
|
||||
sample{t: minutes(40), f: float64(2)},
|
||||
{Ts: minutes(20), V: 1},
|
||||
{Ts: minutes(22), V: 1},
|
||||
{Ts: minutes(24), V: 1},
|
||||
{Ts: minutes(26), V: 1},
|
||||
{Ts: minutes(29), V: 1},
|
||||
// Chunk 3
|
||||
{Ts: minutes(30), V: 2},
|
||||
{Ts: minutes(32), V: 2},
|
||||
{Ts: minutes(34), V: 2},
|
||||
{Ts: minutes(36), V: 2},
|
||||
{Ts: minutes(40), V: 2},
|
||||
// Head
|
||||
sample{t: minutes(40), f: float64(3)},
|
||||
sample{t: minutes(50), f: float64(3)},
|
||||
{Ts: minutes(40), V: 3},
|
||||
{Ts: minutes(50), V: 3},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -582,23 +587,23 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// Output Graphically [----------------][-----------------]
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(12), f: float64(0)},
|
||||
sample{t: minutes(14), f: float64(0)},
|
||||
sample{t: minutes(16), f: float64(0)},
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(22), f: float64(1)},
|
||||
sample{t: minutes(24), f: float64(1)},
|
||||
sample{t: minutes(26), f: float64(1)},
|
||||
sample{t: minutes(29), f: float64(1)},
|
||||
scenario.sampleFunc(minutes(10), 0),
|
||||
scenario.sampleFunc(minutes(12), 0),
|
||||
scenario.sampleFunc(minutes(14), 0),
|
||||
scenario.sampleFunc(minutes(16), 0),
|
||||
scenario.sampleFunc(minutes(20), 1),
|
||||
scenario.sampleFunc(minutes(22), 1),
|
||||
scenario.sampleFunc(minutes(24), 1),
|
||||
scenario.sampleFunc(minutes(26), 1),
|
||||
scenario.sampleFunc(minutes(29), 1),
|
||||
},
|
||||
{
|
||||
sample{t: minutes(30), f: float64(2)},
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(34), f: float64(2)},
|
||||
sample{t: minutes(36), f: float64(2)},
|
||||
sample{t: minutes(40), f: float64(3)},
|
||||
sample{t: minutes(50), f: float64(3)},
|
||||
scenario.sampleFunc(minutes(30), 2),
|
||||
scenario.sampleFunc(minutes(32), 2),
|
||||
scenario.sampleFunc(minutes(34), 2),
|
||||
scenario.sampleFunc(minutes(36), 2),
|
||||
scenario.sampleFunc(minutes(40), 3),
|
||||
scenario.sampleFunc(minutes(50), 3),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -607,28 +612,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
inputSamples: chunks.SampleSlice{
|
||||
inputSamples: []tsValue{
|
||||
// Chunk 0
|
||||
sample{t: minutes(40), f: float64(0)},
|
||||
sample{t: minutes(42), f: float64(0)},
|
||||
sample{t: minutes(44), f: float64(0)},
|
||||
sample{t: minutes(46), f: float64(0)},
|
||||
sample{t: minutes(50), f: float64(0)},
|
||||
{Ts: minutes(40), V: 0},
|
||||
{Ts: minutes(42), V: 0},
|
||||
{Ts: minutes(44), V: 0},
|
||||
{Ts: minutes(46), V: 0},
|
||||
{Ts: minutes(50), V: 0},
|
||||
// Chunk 1
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(32), f: float64(1)},
|
||||
sample{t: minutes(34), f: float64(1)},
|
||||
sample{t: minutes(36), f: float64(1)},
|
||||
sample{t: minutes(40), f: float64(1)},
|
||||
// Chunk 2
|
||||
sample{t: minutes(20), f: float64(2)},
|
||||
sample{t: minutes(22), f: float64(2)},
|
||||
sample{t: minutes(24), f: float64(2)},
|
||||
sample{t: minutes(26), f: float64(2)},
|
||||
sample{t: minutes(29), f: float64(2)},
|
||||
{Ts: minutes(30), V: 1},
|
||||
{Ts: minutes(32), V: 1},
|
||||
{Ts: minutes(34), V: 1},
|
||||
{Ts: minutes(36), V: 1},
|
||||
{Ts: minutes(40), V: 1},
|
||||
// Chunk 3
|
||||
{Ts: minutes(20), V: 2},
|
||||
{Ts: minutes(22), V: 2},
|
||||
{Ts: minutes(24), V: 2},
|
||||
{Ts: minutes(26), V: 2},
|
||||
{Ts: minutes(29), V: 2},
|
||||
// Head
|
||||
sample{t: minutes(10), f: float64(3)},
|
||||
sample{t: minutes(20), f: float64(3)},
|
||||
{Ts: minutes(10), V: 3},
|
||||
{Ts: minutes(20), V: 3},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -640,23 +645,23 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// Output Graphically [----------------][-----------------]
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(10), f: float64(3)},
|
||||
sample{t: minutes(20), f: float64(2)},
|
||||
sample{t: minutes(22), f: float64(2)},
|
||||
sample{t: minutes(24), f: float64(2)},
|
||||
sample{t: minutes(26), f: float64(2)},
|
||||
sample{t: minutes(29), f: float64(2)},
|
||||
scenario.sampleFunc(minutes(10), 3),
|
||||
scenario.sampleFunc(minutes(20), 2),
|
||||
scenario.sampleFunc(minutes(22), 2),
|
||||
scenario.sampleFunc(minutes(24), 2),
|
||||
scenario.sampleFunc(minutes(26), 2),
|
||||
scenario.sampleFunc(minutes(29), 2),
|
||||
},
|
||||
{
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(32), f: float64(1)},
|
||||
sample{t: minutes(34), f: float64(1)},
|
||||
sample{t: minutes(36), f: float64(1)},
|
||||
sample{t: minutes(40), f: float64(0)},
|
||||
sample{t: minutes(42), f: float64(0)},
|
||||
sample{t: minutes(44), f: float64(0)},
|
||||
sample{t: minutes(46), f: float64(0)},
|
||||
sample{t: minutes(50), f: float64(0)},
|
||||
scenario.sampleFunc(minutes(30), 1),
|
||||
scenario.sampleFunc(minutes(32), 1),
|
||||
scenario.sampleFunc(minutes(34), 1),
|
||||
scenario.sampleFunc(minutes(36), 1),
|
||||
scenario.sampleFunc(minutes(40), 0),
|
||||
scenario.sampleFunc(minutes(42), 0),
|
||||
scenario.sampleFunc(minutes(44), 0),
|
||||
scenario.sampleFunc(minutes(46), 0),
|
||||
scenario.sampleFunc(minutes(50), 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -665,28 +670,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
inputSamples: chunks.SampleSlice{
|
||||
inputSamples: []tsValue{
|
||||
// Chunk 0
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(12), f: float64(0)},
|
||||
sample{t: minutes(14), f: float64(0)},
|
||||
sample{t: minutes(16), f: float64(0)},
|
||||
sample{t: minutes(18), f: float64(0)},
|
||||
{Ts: minutes(10), V: 0},
|
||||
{Ts: minutes(12), V: 0},
|
||||
{Ts: minutes(14), V: 0},
|
||||
{Ts: minutes(16), V: 0},
|
||||
{Ts: minutes(18), V: 0},
|
||||
// Chunk 1
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(22), f: float64(1)},
|
||||
sample{t: minutes(24), f: float64(1)},
|
||||
sample{t: minutes(26), f: float64(1)},
|
||||
sample{t: minutes(28), f: float64(1)},
|
||||
// Chunk 2
|
||||
sample{t: minutes(30), f: float64(2)},
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(34), f: float64(2)},
|
||||
sample{t: minutes(36), f: float64(2)},
|
||||
sample{t: minutes(38), f: float64(2)},
|
||||
{Ts: minutes(20), V: 1},
|
||||
{Ts: minutes(22), V: 1},
|
||||
{Ts: minutes(24), V: 1},
|
||||
{Ts: minutes(26), V: 1},
|
||||
{Ts: minutes(28), V: 1},
|
||||
// Chunk 3
|
||||
{Ts: minutes(30), V: 2},
|
||||
{Ts: minutes(32), V: 2},
|
||||
{Ts: minutes(34), V: 2},
|
||||
{Ts: minutes(36), V: 2},
|
||||
{Ts: minutes(38), V: 2},
|
||||
// Head
|
||||
sample{t: minutes(40), f: float64(3)},
|
||||
sample{t: minutes(42), f: float64(3)},
|
||||
{Ts: minutes(40), V: 3},
|
||||
{Ts: minutes(42), V: 3},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -698,29 +703,29 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// Output Graphically [-------][-------][-------][--------]
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(12), f: float64(0)},
|
||||
sample{t: minutes(14), f: float64(0)},
|
||||
sample{t: minutes(16), f: float64(0)},
|
||||
sample{t: minutes(18), f: float64(0)},
|
||||
scenario.sampleFunc(minutes(10), 0),
|
||||
scenario.sampleFunc(minutes(12), 0),
|
||||
scenario.sampleFunc(minutes(14), 0),
|
||||
scenario.sampleFunc(minutes(16), 0),
|
||||
scenario.sampleFunc(minutes(18), 0),
|
||||
},
|
||||
{
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(22), f: float64(1)},
|
||||
sample{t: minutes(24), f: float64(1)},
|
||||
sample{t: minutes(26), f: float64(1)},
|
||||
sample{t: minutes(28), f: float64(1)},
|
||||
scenario.sampleFunc(minutes(20), 1),
|
||||
scenario.sampleFunc(minutes(22), 1),
|
||||
scenario.sampleFunc(minutes(24), 1),
|
||||
scenario.sampleFunc(minutes(26), 1),
|
||||
scenario.sampleFunc(minutes(28), 1),
|
||||
},
|
||||
{
|
||||
sample{t: minutes(30), f: float64(2)},
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(34), f: float64(2)},
|
||||
sample{t: minutes(36), f: float64(2)},
|
||||
sample{t: minutes(38), f: float64(2)},
|
||||
scenario.sampleFunc(minutes(30), 2),
|
||||
scenario.sampleFunc(minutes(32), 2),
|
||||
scenario.sampleFunc(minutes(34), 2),
|
||||
scenario.sampleFunc(minutes(36), 2),
|
||||
scenario.sampleFunc(minutes(38), 2),
|
||||
},
|
||||
{
|
||||
sample{t: minutes(40), f: float64(3)},
|
||||
sample{t: minutes(42), f: float64(3)},
|
||||
scenario.sampleFunc(minutes(40), 3),
|
||||
scenario.sampleFunc(minutes(42), 3),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -729,22 +734,22 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
inputSamples: chunks.SampleSlice{
|
||||
inputSamples: []tsValue{
|
||||
// Chunk 0
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(15), f: float64(0)},
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
sample{t: minutes(25), f: float64(0)},
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
{Ts: minutes(10), V: 0},
|
||||
{Ts: minutes(15), V: 0},
|
||||
{Ts: minutes(20), V: 0},
|
||||
{Ts: minutes(25), V: 0},
|
||||
{Ts: minutes(30), V: 0},
|
||||
// Chunk 1
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
sample{t: minutes(42), f: float64(1)},
|
||||
{Ts: minutes(20), V: 1},
|
||||
{Ts: minutes(25), V: 1},
|
||||
{Ts: minutes(30), V: 1},
|
||||
{Ts: minutes(35), V: 1},
|
||||
{Ts: minutes(42), V: 1},
|
||||
// Chunk 2 Head
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(50), f: float64(2)},
|
||||
{Ts: minutes(32), V: 2},
|
||||
{Ts: minutes(50), V: 2},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -755,15 +760,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// Output Graphically [-----------------------------------]
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(15), f: float64(0)},
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
sample{t: minutes(42), f: float64(1)},
|
||||
sample{t: minutes(50), f: float64(2)},
|
||||
scenario.sampleFunc(minutes(10), 0),
|
||||
scenario.sampleFunc(minutes(15), 0),
|
||||
scenario.sampleFunc(minutes(20), 1),
|
||||
scenario.sampleFunc(minutes(25), 1),
|
||||
scenario.sampleFunc(minutes(30), 1),
|
||||
scenario.sampleFunc(minutes(32), 2),
|
||||
scenario.sampleFunc(minutes(35), 1),
|
||||
scenario.sampleFunc(minutes(42), 1),
|
||||
scenario.sampleFunc(minutes(50), 2),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -772,22 +777,22 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
queryMinT: minutes(12),
|
||||
queryMaxT: minutes(33),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
inputSamples: chunks.SampleSlice{
|
||||
inputSamples: []tsValue{
|
||||
// Chunk 0
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(15), f: float64(0)},
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
sample{t: minutes(25), f: float64(0)},
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
{Ts: minutes(10), V: 0},
|
||||
{Ts: minutes(15), V: 0},
|
||||
{Ts: minutes(20), V: 0},
|
||||
{Ts: minutes(25), V: 0},
|
||||
{Ts: minutes(30), V: 0},
|
||||
// Chunk 1
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
sample{t: minutes(42), f: float64(1)},
|
||||
{Ts: minutes(20), V: 1},
|
||||
{Ts: minutes(25), V: 1},
|
||||
{Ts: minutes(30), V: 1},
|
||||
{Ts: minutes(35), V: 1},
|
||||
{Ts: minutes(42), V: 1},
|
||||
// Chunk 2 Head
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(50), f: float64(2)},
|
||||
{Ts: minutes(32), V: 2},
|
||||
{Ts: minutes(50), V: 2},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -798,15 +803,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// Output Graphically [-----------------------------------]
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(10), f: float64(0)},
|
||||
sample{t: minutes(15), f: float64(0)},
|
||||
sample{t: minutes(20), f: float64(1)},
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(30), f: float64(1)},
|
||||
sample{t: minutes(32), f: float64(2)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
sample{t: minutes(42), f: float64(1)},
|
||||
sample{t: minutes(50), f: float64(2)},
|
||||
scenario.sampleFunc(minutes(10), 0),
|
||||
scenario.sampleFunc(minutes(15), 0),
|
||||
scenario.sampleFunc(minutes(20), 1),
|
||||
scenario.sampleFunc(minutes(25), 1),
|
||||
scenario.sampleFunc(minutes(30), 1),
|
||||
scenario.sampleFunc(minutes(32), 2),
|
||||
scenario.sampleFunc(minutes(35), 1),
|
||||
scenario.sampleFunc(minutes(42), 1),
|
||||
scenario.sampleFunc(minutes(50), 2),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -817,13 +822,14 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
db := newTestDBWithOpts(t, opts)
|
||||
|
||||
app := db.Appender(context.Background())
|
||||
s1Ref := appendSample(app, s1, tc.firstInOrderSampleAt, float64(tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()))
|
||||
s1Ref, _, _ := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// OOO few samples for s1.
|
||||
app = db.Appender(context.Background())
|
||||
for _, s := range tc.inputSamples {
|
||||
appendSample(app, s1, s.T(), s.F())
|
||||
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -843,13 +849,9 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Nil(t, c)
|
||||
|
||||
var resultSamples chunks.SampleSlice
|
||||
it := iterable.Iterator(nil)
|
||||
for it.Next() == chunkenc.ValFloat {
|
||||
t, v := it.At()
|
||||
resultSamples = append(resultSamples, sample{t: t, f: v})
|
||||
}
|
||||
require.Equal(t, tc.expChunksSamples[i], resultSamples)
|
||||
resultSamples := samplesFromIterator(t, it)
|
||||
compareSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -864,6 +866,24 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
|
|||
// - Response B comes from : Series(), in parallel new samples added to the head, then Chunk()
|
||||
// - A == B
|
||||
func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) {
|
||||
for name, scenario := range sampleTypeScenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding tests
|
||||
// that if a query comes and performs a Series() call followed by a Chunks() call
|
||||
// the response is consistent with the data seen by Series() even if the OOO
|
||||
// head receives more samples before Chunks() is called.
|
||||
// An example:
|
||||
// - Response A comes from: Series() then Chunk()
|
||||
// - Response B comes from : Series(), in parallel new samples added to the head, then Chunk()
|
||||
// - A == B
|
||||
//
|
||||
//nolint:revive // unexported-return.
|
||||
func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 5
|
||||
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
|
||||
|
@ -871,19 +891,13 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
s1 := labels.FromStrings("l", "v1")
|
||||
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
|
||||
|
||||
appendSample := func(app storage.Appender, l labels.Labels, timestamp int64, value float64) storage.SeriesRef {
|
||||
ref, err := app.Append(0, l, timestamp, value)
|
||||
require.NoError(t, err)
|
||||
return ref
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
queryMinT int64
|
||||
queryMaxT int64
|
||||
firstInOrderSampleAt int64
|
||||
initialSamples chunks.SampleSlice
|
||||
samplesAfterSeriesCall chunks.SampleSlice
|
||||
initialSamples []tsValue
|
||||
samplesAfterSeriesCall []tsValue
|
||||
expChunkError bool
|
||||
expChunksSamples []chunks.SampleSlice
|
||||
}{
|
||||
|
@ -892,21 +906,21 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
initialSamples: chunks.SampleSlice{
|
||||
initialSamples: []tsValue{
|
||||
// Chunk 0
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
sample{t: minutes(22), f: float64(0)},
|
||||
sample{t: minutes(24), f: float64(0)},
|
||||
sample{t: minutes(26), f: float64(0)},
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
{Ts: minutes(20), V: 0},
|
||||
{Ts: minutes(22), V: 0},
|
||||
{Ts: minutes(24), V: 0},
|
||||
{Ts: minutes(26), V: 0},
|
||||
{Ts: minutes(30), V: 0},
|
||||
// Chunk 1 Head
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
{Ts: minutes(25), V: 1},
|
||||
{Ts: minutes(35), V: 1},
|
||||
},
|
||||
samplesAfterSeriesCall: chunks.SampleSlice{
|
||||
sample{t: minutes(10), f: float64(1)},
|
||||
sample{t: minutes(32), f: float64(1)},
|
||||
sample{t: minutes(50), f: float64(1)},
|
||||
samplesAfterSeriesCall: []tsValue{
|
||||
{Ts: minutes(10), V: 1},
|
||||
{Ts: minutes(32), V: 1},
|
||||
{Ts: minutes(50), V: 1},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -918,39 +932,40 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
// Output Graphically [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept)
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
sample{t: minutes(22), f: float64(0)},
|
||||
sample{t: minutes(24), f: float64(0)},
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(26), f: float64(0)},
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
scenario.sampleFunc(minutes(20), 0),
|
||||
scenario.sampleFunc(minutes(22), 0),
|
||||
scenario.sampleFunc(minutes(24), 0),
|
||||
scenario.sampleFunc(minutes(25), 1),
|
||||
scenario.sampleFunc(minutes(26), 0),
|
||||
scenario.sampleFunc(minutes(30), 0),
|
||||
scenario.sampleFunc(minutes(32), 1), // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept
|
||||
scenario.sampleFunc(minutes(35), 1),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "After Series() previous head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in the response.",
|
||||
name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.",
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
initialSamples: chunks.SampleSlice{
|
||||
initialSamples: []tsValue{
|
||||
// Chunk 0
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
sample{t: minutes(22), f: float64(0)},
|
||||
sample{t: minutes(24), f: float64(0)},
|
||||
sample{t: minutes(26), f: float64(0)},
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
{Ts: minutes(20), V: 0},
|
||||
{Ts: minutes(22), V: 0},
|
||||
{Ts: minutes(24), V: 0},
|
||||
{Ts: minutes(26), V: 0},
|
||||
{Ts: minutes(30), V: 0},
|
||||
// Chunk 1 Head
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
{Ts: minutes(25), V: 1},
|
||||
{Ts: minutes(35), V: 1},
|
||||
},
|
||||
samplesAfterSeriesCall: chunks.SampleSlice{
|
||||
sample{t: minutes(10), f: float64(1)},
|
||||
sample{t: minutes(32), f: float64(1)},
|
||||
sample{t: minutes(50), f: float64(1)},
|
||||
samplesAfterSeriesCall: []tsValue{
|
||||
{Ts: minutes(10), V: 1},
|
||||
{Ts: minutes(32), V: 1},
|
||||
{Ts: minutes(50), V: 1},
|
||||
// Chunk 1 gets mmapped and Chunk 2, the new head is born
|
||||
sample{t: minutes(25), f: float64(2)},
|
||||
sample{t: minutes(31), f: float64(2)},
|
||||
{Ts: minutes(25), V: 2},
|
||||
{Ts: minutes(31), V: 2},
|
||||
},
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
|
@ -963,6 +978,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
// Output Graphically [------------] (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1
|
||||
expChunksSamples: []chunks.SampleSlice{
|
||||
{
|
||||
<<<<<<< HEAD
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
sample{t: minutes(22), f: float64(0)},
|
||||
sample{t: minutes(24), f: float64(0)},
|
||||
|
@ -970,6 +986,25 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
sample{t: minutes(26), f: float64(0)},
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
||||||| parent of 2795db1c2 (Update tsdb tests to use test utils.)
|
||||
sample{t: minutes(20), f: float64(0)},
|
||||
sample{t: minutes(22), f: float64(0)},
|
||||
sample{t: minutes(24), f: float64(0)},
|
||||
sample{t: minutes(25), f: float64(1)},
|
||||
sample{t: minutes(26), f: float64(0)},
|
||||
sample{t: minutes(30), f: float64(0)},
|
||||
sample{t: minutes(32), f: float64(1)}, // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept
|
||||
sample{t: minutes(35), f: float64(1)},
|
||||
=======
|
||||
scenario.sampleFunc(minutes(20), 0),
|
||||
scenario.sampleFunc(minutes(22), 0),
|
||||
scenario.sampleFunc(minutes(24), 0),
|
||||
scenario.sampleFunc(minutes(25), 1),
|
||||
scenario.sampleFunc(minutes(26), 0),
|
||||
scenario.sampleFunc(minutes(30), 0),
|
||||
scenario.sampleFunc(minutes(32), 1), // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept
|
||||
scenario.sampleFunc(minutes(35), 1),
|
||||
>>>>>>> 2795db1c2 (Update tsdb tests to use test utils.)
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -980,13 +1015,15 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
db := newTestDBWithOpts(t, opts)
|
||||
|
||||
app := db.Appender(context.Background())
|
||||
s1Ref := appendSample(app, s1, tc.firstInOrderSampleAt, float64(tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()))
|
||||
s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// OOO few samples for s1.
|
||||
app = db.Appender(context.Background())
|
||||
for _, s := range tc.initialSamples {
|
||||
appendSample(app, s1, s.T(), s.F())
|
||||
_, _, err := scenario.appendFunc(app, s1, s.Ts, s.V)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -995,7 +1032,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0)
|
||||
var chks []chunks.Meta
|
||||
var b labels.ScratchBuilder
|
||||
err := ir.Series(s1Ref, &b, &chks)
|
||||
err = ir.Series(s1Ref, &b, &chks)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(tc.expChunksSamples), len(chks))
|
||||
|
||||
|
@ -1003,7 +1040,8 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
// OOO few samples for s1.
|
||||
app = db.Appender(context.Background())
|
||||
for _, s := range tc.samplesAfterSeriesCall {
|
||||
appendSample(app, s1, s.T(), s.F())
|
||||
_, _, err = scenario.appendFunc(app, s1, s.Ts, s.V)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
|
@ -1014,13 +1052,9 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
require.NoError(t, err)
|
||||
require.Nil(t, c)
|
||||
|
||||
var resultSamples chunks.SampleSlice
|
||||
it := iterable.Iterator(nil)
|
||||
for it.Next() == chunkenc.ValFloat {
|
||||
ts, v := it.At()
|
||||
resultSamples = append(resultSamples, sample{t: ts, f: v})
|
||||
}
|
||||
require.Equal(t, tc.expChunksSamples[i], resultSamples)
|
||||
resultSamples := samplesFromIterator(t, it)
|
||||
compareSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue