From 45a32a29ef3b4ad0c7ec87dc948b80320733e19e Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Sun, 3 Mar 2024 11:44:12 -0800 Subject: [PATCH] Update tsdb tests to use test utils. Co-authored-by: Fiona Liao Signed-off-by: Carrie Edwards --- tsdb/db_test.go | 250 ++++++++++++------ tsdb/head_test.go | 88 +++++-- tsdb/ooo_head_read_test.go | 522 ++++++++++++++++++++----------------- 3 files changed, 515 insertions(+), 345 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 1fb6d30d61..3d6d55c90c 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4495,6 +4495,14 @@ func TestMetadataAssertInMemoryData(t *testing.T) { // // are not included in this compaction. func TestOOOCompaction(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOCompaction(t, scenario) + }) + } +} + +func testOOOCompaction(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() ctx := context.Background() @@ -4516,9 +4524,9 @@ func TestOOOCompaction(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, _, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - _, err = app.Append(0, series2, ts, float64(2*ts)) + _, _, err = scenario.appendFunc(app, series2, ts, 2*ts) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -4551,8 +4559,8 @@ func TestOOOCompaction(t *testing.T) { fromMins, toMins := r[0], r[1] for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) + series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts)) + series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts)) } } expRes := map[string][]chunks.Sample{ @@ -4564,7 +4572,7 @@ func TestOOOCompaction(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } verifyDBSamples() // Before any compaction. @@ -4619,8 +4627,8 @@ func TestOOOCompaction(t *testing.T) { series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) + series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts)) + series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts)) } expRes := map[string][]chunks.Sample{ series1.String(): series1Samples, @@ -4631,7 +4639,7 @@ func TestOOOCompaction(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } // Checking for expected data in the blocks. @@ -4675,6 +4683,14 @@ func TestOOOCompaction(t *testing.T) { // TestOOOCompactionWithNormalCompaction tests if OOO compaction is performed // when the normal head's compaction is done. func TestOOOCompactionWithNormalCompaction(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOCompactionWithNormalCompaction(t, scenario) + }) + } +} + +func testOOOCompactionWithNormalCompaction(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() ctx := context.Background() @@ -4696,9 +4712,9 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, _, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - _, err = app.Append(0, series2, ts, float64(2*ts)) + _, _, err = scenario.appendFunc(app, series2, ts, 2*ts) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -4751,8 +4767,8 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) + series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts)) + series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts)) } expRes := map[string][]chunks.Sample{ series1.String(): series1Samples, @@ -4763,7 +4779,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } // Checking for expected data in the blocks. @@ -4775,6 +4791,14 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { // configured to not have wal and wbl but its able to compact both the in-order // and out-of-order head. func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOCompactionWithDisabledWriteLog(t, scenario) + }) + } +} + +func testOOOCompactionWithDisabledWriteLog(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() ctx := context.Background() @@ -4797,9 +4821,9 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, _, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - _, err = app.Append(0, series2, ts, float64(2*ts)) + _, _, err = scenario.appendFunc(app, series2, ts, 2*ts) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -4852,8 +4876,8 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) + series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts)) + series2Samples = append(series2Samples, scenario.sampleFunc(ts, 2*ts)) } expRes := map[string][]chunks.Sample{ series1.String(): series1Samples, @@ -4864,7 +4888,7 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } // Checking for expected data in the blocks. @@ -4876,6 +4900,14 @@ func TestOOOCompactionWithDisabledWriteLog(t *testing.T) { // missing after a restart while snapshot was enabled, but the query still returns the right // data from the mmap chunks. func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t, scenario) + }) + } +} + +func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() ctx := context.Background() @@ -4898,9 +4930,9 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, _, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - _, err = app.Append(0, series2, ts, float64(2*ts)) + _, _, err = scenario.appendFunc(app, series2, ts, 2*ts) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -4946,8 +4978,8 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { series2Samples := make([]chunks.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) - series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil}) + series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts)) + series2Samples = append(series2Samples, scenario.sampleFunc(ts, ts*2)) } expRes := map[string][]chunks.Sample{ series1.String(): series1Samples, @@ -4958,7 +4990,7 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } // Checking for expected ooo data from mmap chunks. @@ -5159,6 +5191,14 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { } func TestOOOAppendAndQuery(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOAppendAndQuery(t, scenario) + }) + } +} + +func testOOOAppendAndQuery(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds() @@ -5180,13 +5220,13 @@ func TestOOOAppendAndQuery(t *testing.T) { key := lbls.String() from, to := minutes(fromMins), minutes(toMins) for min := from; min <= to; min += time.Minute.Milliseconds() { - val := rand.Float64() - _, err := app.Append(0, lbls, min, val) + val := rand.Intn(1000) + _, s, err := scenario.appendFunc(app, lbls, min, int64(val)) if faceError { require.Error(t, err) } else { require.NoError(t, err) - appendedSamples[key] = append(appendedSamples[key], sample{t: min, f: val}) + appendedSamples[key] = append(appendedSamples[key], s) totalSamples++ } } @@ -5222,7 +5262,7 @@ func TestOOOAppendAndQuery(t *testing.T) { expSamples[k] = append(expSamples[k], s) } } - require.Equal(t, expSamples, seriesSet) + requireEqualSamples(t, expSamples, seriesSet, true) requireEqualOOOSamples(t, totalSamples-2, db) } @@ -5284,6 +5324,14 @@ func TestOOOAppendAndQuery(t *testing.T) { } func TestOOODisabled(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOODisabled(t, scenario) + }) + } +} + +func testOOODisabled(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = 0 db := openTestDB(t, opts, nil) @@ -5297,19 +5345,19 @@ func TestOOODisabled(t *testing.T) { expSamples := make(map[string][]chunks.Sample) totalSamples := 0 failedSamples := 0 - addSample := func(lbls labels.Labels, fromMins, toMins int64, faceError bool) { + + addSample := func(db *DB, lbls labels.Labels, fromMins, toMins int64, faceError bool) { app := db.Appender(context.Background()) key := lbls.String() from, to := minutes(fromMins), minutes(toMins) for min := from; min <= to; min += time.Minute.Milliseconds() { - val := rand.Float64() - _, err := app.Append(0, lbls, min, val) + _, _, err := scenario.appendFunc(app, lbls, min, min) if faceError { require.Error(t, err) failedSamples++ } else { require.NoError(t, err) - expSamples[key] = append(expSamples[key], sample{t: min, f: val}) + expSamples[key] = append(expSamples[key], scenario.sampleFunc(min, min)) totalSamples++ } } @@ -5320,21 +5368,21 @@ func TestOOODisabled(t *testing.T) { } } - addSample(s1, 300, 300, false) // In-order samples. - addSample(s1, 250, 260, true) // Some ooo samples. - addSample(s1, 59, 59, true) // Out of time window. - addSample(s1, 60, 65, true) // At the edge of time window, also it would be "out of bound" without the ooo support. - addSample(s1, 59, 59, true) // Out of time window again. - addSample(s1, 301, 310, false) // More in-order samples. + addSample(db, s1, 300, 300, false) // In-order samples. + addSample(db, s1, 250, 260, true) // Some ooo samples. + addSample(db, s1, 59, 59, true) // Out of time window. + addSample(db, s1, 60, 65, true) // At the edge of time window, also it would be "out of bound" without the ooo support. + addSample(db, s1, 59, 59, true) // Out of time window again. + addSample(db, s1, 301, 310, false) // More in-order samples. querier, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")) - require.Equal(t, expSamples, seriesSet) + requireEqualSamples(t, expSamples, seriesSet, true) requireEqualOOOSamples(t, 0, db) require.Equal(t, float64(failedSamples), - prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)), + prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType)), "number of ooo/oob samples mismatch") // Verifying that no OOO artifacts were generated. @@ -5349,6 +5397,14 @@ func TestOOODisabled(t *testing.T) { } func TestWBLAndMmapReplay(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testWBLAndMmapReplay(t, scenario) + }) + } +} + +func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds() @@ -5369,10 +5425,10 @@ func TestWBLAndMmapReplay(t *testing.T) { key := lbls.String() from, to := minutes(fromMins), minutes(toMins) for min := from; min <= to; min += time.Minute.Milliseconds() { - val := rand.Float64() - _, err := app.Append(0, lbls, min, val) + val := rand.Intn(1000) + _, s, err := scenario.appendFunc(app, lbls, min, int64(val)) require.NoError(t, err) - expSamples[key] = append(expSamples[key], sample{t: min, f: val}) + expSamples[key] = append(expSamples[key], s) totalSamples++ } require.NoError(t, app.Commit()) @@ -5390,7 +5446,7 @@ func TestWBLAndMmapReplay(t *testing.T) { }) exp[k] = v } - require.Equal(t, exp, seriesSet) + requireEqualSamples(t, exp, seriesSet, true) } // In-order samples. @@ -5413,10 +5469,7 @@ func TestWBLAndMmapReplay(t *testing.T) { chk, err := db.head.chunkDiskMapper.Chunk(mc.ref) require.NoError(t, err) it := chk.Iterator(nil) - for it.Next() == chunkenc.ValFloat { - ts, val := it.At() - s1MmapSamples = append(s1MmapSamples, sample{t: ts, f: val}) - } + s1MmapSamples = append(s1MmapSamples, samplesFromIterator(t, it)...) } require.NotEmpty(t, s1MmapSamples) @@ -5534,6 +5587,14 @@ func TestWBLAndMmapReplay(t *testing.T) { } func TestOOOCompactionFailure(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOCompactionFailure(t, scenario) + }) + } +} + +func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() ctx := context.Background() @@ -5554,7 +5615,7 @@ func TestOOOCompactionFailure(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, _, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -5642,7 +5703,7 @@ func TestOOOCompactionFailure(t *testing.T) { series1Samples := make([]chunks.Sample, 0, toMins-fromMins+1) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil}) + series1Samples = append(series1Samples, scenario.sampleFunc(ts, ts)) } expRes := map[string][]chunks.Sample{ series1.String(): series1Samples, @@ -5650,9 +5711,8 @@ func TestOOOCompactionFailure(t *testing.T) { q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64) require.NoError(t, err) - actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } // Checking for expected data in the blocks. @@ -5819,6 +5879,14 @@ func TestWBLCorruption(t *testing.T) { } func TestOOOMmapCorruption(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOMmapCorruption(t, scenario) + }) + } +} + +func testOOOMmapCorruption(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() opts := DefaultOptions() @@ -5838,11 +5906,11 @@ func TestOOOMmapCorruption(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, s, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - allSamples = append(allSamples, sample{t: ts, f: float64(ts)}) + allSamples = append(allSamples, s) if inMmapAfterCorruption { - expInMmapChunks = append(expInMmapChunks, sample{t: ts, f: float64(ts)}) + expInMmapChunks = append(expInMmapChunks, s) } } require.NoError(t, app.Commit()) @@ -5880,7 +5948,7 @@ func TestOOOMmapCorruption(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } verifySamples(allSamples) @@ -5942,6 +6010,14 @@ func TestOOOMmapCorruption(t *testing.T) { } func TestOutOfOrderRuntimeConfig(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOutOfOrderRuntimeConfig(t, scenario) + }) + } +} + +func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) { ctx := context.Background() getDB := func(oooTimeWindow int64) *DB { @@ -5975,10 +6051,10 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, s, err := scenario.appendFunc(app, series1, ts, ts) if success { require.NoError(t, err) - allSamples = append(allSamples, sample{t: ts, f: float64(ts)}) + allSamples = append(allSamples, s) } else { require.Error(t, err) } @@ -6000,7 +6076,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } doOOOCompaction := func(t *testing.T, db *DB) { @@ -6173,12 +6249,20 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) { } func TestNoGapAfterRestartWithOOO(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testNoGapAfterRestartWithOOO(t, scenario) + }) + } +} + +func testNoGapAfterRestartWithOOO(t *testing.T, scenario sampleTypeScenario) { series1 := labels.FromStrings("foo", "bar1") addSamples := func(t *testing.T, db *DB, fromMins, toMins int64, success bool) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, _, err := scenario.appendFunc(app, series1, ts, ts) if success { require.NoError(t, err) } else { @@ -6192,7 +6276,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) { var expSamples []chunks.Sample for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - expSamples = append(expSamples, sample{t: ts, f: float64(ts)}) + expSamples = append(expSamples, scenario.sampleFunc(ts, ts)) } expRes := map[string][]chunks.Sample{ @@ -6203,7 +6287,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } cases := []struct { @@ -6280,6 +6364,14 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) { } func TestWblReplayAfterOOODisableAndRestart(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testWblReplayAfterOOODisableAndRestart(t, scenario) + }) + } +} + +func testWblReplayAfterOOODisableAndRestart(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() opts := DefaultOptions() @@ -6298,9 +6390,9 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, s, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - allSamples = append(allSamples, sample{t: ts, f: float64(ts)}) + allSamples = append(allSamples, s) } require.NoError(t, app.Commit()) } @@ -6323,7 +6415,7 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) { require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) - require.Equal(t, expRes, actRes) + requireEqualSamples(t, expRes, actRes, true) } verifySamples(allSamples) @@ -6339,6 +6431,14 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) { } func TestPanicOnApplyConfig(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testPanicOnApplyConfig(t, scenario) + }) + } +} + +func testPanicOnApplyConfig(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() opts := DefaultOptions() @@ -6357,9 +6457,9 @@ func TestPanicOnApplyConfig(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, s, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - allSamples = append(allSamples, sample{t: ts, f: float64(ts)}) + allSamples = append(allSamples, s) } require.NoError(t, app.Commit()) } @@ -6387,6 +6487,14 @@ func TestPanicOnApplyConfig(t *testing.T) { } func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testDiskFillingUpAfterDisablingOOO(t, scenario) + }) + } +} + +func testDiskFillingUpAfterDisablingOOO(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() ctx := context.Background() @@ -6406,9 +6514,9 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { app := db.Appender(context.Background()) for min := fromMins; min <= toMins; min++ { ts := min * time.Minute.Milliseconds() - _, err := app.Append(0, series1, ts, float64(ts)) + _, s, err := scenario.appendFunc(app, series1, ts, ts) require.NoError(t, err) - allSamples = append(allSamples, sample{t: ts, f: float64(ts)}) + allSamples = append(allSamples, s) } require.NoError(t, app.Commit()) } @@ -7060,12 +7168,6 @@ Outer: require.NoError(t, writerErr) } -func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) { - require.Equal(t, float64(expectedSamples), - prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat)), - "number of ooo appended samples mismatch") -} - type mockCompactorFn struct { planFn func() ([]string, error) compactFn func() ([]ulid.ULID, error) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 93f046e5b3..1eb0c0534d 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2665,6 +2665,14 @@ func TestIsolationWithoutAdd(t *testing.T) { } func TestOutOfOrderSamplesMetric(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOutOfOrderSamplesMetric(t, scenario) + }) + } +} + +func testOutOfOrderSamplesMetric(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() db, err := Open(dir, nil, nil, DefaultOptions(), nil) @@ -2674,33 +2682,38 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { }() db.DisableCompactions() + appendSample := func(appender storage.Appender, ts int64) (storage.SeriesRef, error) { + ref, _, err := scenario.appendFunc(appender, labels.FromStrings("a", "b"), ts, 99) + return ref, err + } + ctx := context.Background() app := db.Appender(ctx) for i := 1; i <= 5; i++ { - _, err = app.Append(0, labels.FromStrings("a", "b"), int64(i), 99) + _, err = appendSample(app, int64(i)) require.NoError(t, err) } require.NoError(t, app.Commit()) // Test out of order metric. - require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) app = db.Appender(ctx) - _, err = app.Append(0, labels.FromStrings("a", "b"), 2, 99) + _, err = appendSample(app, 2) require.Equal(t, storage.ErrOutOfOrderSample, err) - require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) - _, err = app.Append(0, labels.FromStrings("a", "b"), 3, 99) + _, err = appendSample(app, 3) require.Equal(t, storage.ErrOutOfOrderSample, err) - require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) - _, err = app.Append(0, labels.FromStrings("a", "b"), 4, 99) + _, err = appendSample(app, 4) require.Equal(t, storage.ErrOutOfOrderSample, err) - require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) require.NoError(t, app.Commit()) // Compact Head to test out of bound metric. app = db.Appender(ctx) - _, err = app.Append(0, labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) + _, err = appendSample(app, DefaultBlockDuration*2) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -2709,36 +2722,36 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { require.Greater(t, db.head.minValidTime.Load(), int64(0)) app = db.Appender(ctx) - _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99) + _, err = appendSample(app, db.head.minValidTime.Load()-2) require.Equal(t, storage.ErrOutOfBounds, err) - require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType))) - _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99) + _, err = appendSample(app, db.head.minValidTime.Load()-1) require.Equal(t, storage.ErrOutOfBounds, err) - require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(scenario.sampleType))) require.NoError(t, app.Commit()) // Some more valid samples for out of order. app = db.Appender(ctx) for i := 1; i <= 5; i++ { - _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99) + _, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+int64(i)) require.NoError(t, err) } require.NoError(t, app.Commit()) // Test out of order metric. app = db.Appender(ctx) - _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99) + _, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+2) require.Equal(t, storage.ErrOutOfOrderSample, err) - require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) - _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99) + _, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+3) require.Equal(t, storage.ErrOutOfOrderSample, err) - require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) - _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99) + _, err = appendSample(app, db.head.minValidTime.Load()+DefaultBlockDuration+4) require.Equal(t, storage.ErrOutOfOrderSample, err) - require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))) + require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(scenario.sampleType))) require.NoError(t, app.Commit()) } @@ -4801,6 +4814,14 @@ func TestWBLReplay(t *testing.T) { // TestOOOMmapReplay checks the replay at a low level. func TestOOOMmapReplay(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOMmapReplay(t, scenario) + }) + } +} + +func testOOOMmapReplay(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) @@ -4820,8 +4841,7 @@ func TestOOOMmapReplay(t *testing.T) { l := labels.FromStrings("foo", "bar") appendSample := func(mins int64) { app := h.Appender(context.Background()) - ts, v := mins*time.Minute.Milliseconds(), float64(mins) - _, err := app.Append(0, l, ts, v) + _, _, err := scenario.appendFunc(app, l, mins*time.Minute.Milliseconds(), mins) require.NoError(t, err) require.NoError(t, app.Commit()) } @@ -5096,6 +5116,14 @@ func TestReplayAfterMmapReplayError(t *testing.T) { } func TestOOOAppendWithNoSeries(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOAppendWithNoSeries(t, scenario.appendFunc) + }) + } +} + +func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) @@ -5116,7 +5144,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) { appendSample := func(lbls labels.Labels, ts int64) { app := h.Appender(context.Background()) - _, err := app.Append(0, lbls, ts*time.Minute.Milliseconds(), float64(ts)) + _, _, err := appendFunc(app, lbls, ts*time.Minute.Milliseconds(), ts) require.NoError(t, err) require.NoError(t, app.Commit()) } @@ -5164,7 +5192,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) { // Now 179m is too old. s4 := newLabels(4) app := h.Appender(context.Background()) - _, err = app.Append(0, s4, 179*time.Minute.Milliseconds(), float64(179)) + _, _, err = appendFunc(app, s4, 179*time.Minute.Milliseconds(), 179) require.Equal(t, storage.ErrTooOldSample, err) require.NoError(t, app.Rollback()) verifyOOOSamples(s3, 1) @@ -5177,6 +5205,14 @@ func TestOOOAppendWithNoSeries(t *testing.T) { } func TestHeadMinOOOTimeUpdate(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testHeadMinOOOTimeUpdate(t, scenario) + }) + } +} + +func testHeadMinOOOTimeUpdate(t *testing.T, scenario sampleTypeScenario) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy) require.NoError(t, err) @@ -5195,15 +5231,13 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { require.NoError(t, h.Init(0)) appendSample := func(ts int64) { - lbls := labels.FromStrings("foo", "bar") app := h.Appender(context.Background()) - _, err := app.Append(0, lbls, ts*time.Minute.Milliseconds(), float64(ts)) + _, _, err = scenario.appendFunc(app, labels.FromStrings("a", "b"), ts*time.Minute.Milliseconds(), 99.0) require.NoError(t, err) require.NoError(t, app.Commit()) } appendSample(300) // In-order sample. - require.Equal(t, int64(math.MaxInt64), h.MinOOOTime()) appendSample(295) // OOO sample. diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index ce1fff100f..9eaab13dcf 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -359,6 +359,15 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { } func TestOOOHeadChunkReader_LabelValues(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOHeadChunkReader_LabelValues(t, scenario) + }) + } +} + +//nolint:revive // unexported-return. +func testOOOHeadChunkReader_LabelValues(t *testing.T, scenario sampleTypeScenario) { chunkRange := int64(2000) head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true) t.Cleanup(func() { require.NoError(t, head.Close()) }) @@ -368,15 +377,15 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { app := head.Appender(context.Background()) // Add in-order samples - _, err := app.Append(0, labels.FromStrings("foo", "bar1"), 100, 1) + _, _, err := scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 100, int64(1)) require.NoError(t, err) - _, err = app.Append(0, labels.FromStrings("foo", "bar2"), 100, 2) + _, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 100, int64(2)) require.NoError(t, err) // Add ooo samples for those series - _, err = app.Append(0, labels.FromStrings("foo", "bar1"), 90, 1) + _, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar1"), 90, int64(1)) require.NoError(t, err) - _, err = app.Append(0, labels.FromStrings("foo", "bar2"), 90, 2) + _, _, err = scenario.appendFunc(app, labels.FromStrings("foo", "bar2"), 90, int64(2)) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -453,6 +462,19 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) { // It does so by appending out of order samples to the db and then initializing // an OOOHeadChunkReader to read chunks from it. func TestOOOHeadChunkReader_Chunk(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOHeadChunkReader_Chunk(t, scenario) + }) + } +} + +// TestOOOHeadChunkReader_Chunk tests that the Chunk method works as expected. +// It does so by appending out of order samples to the db and then initializing +// an OOOHeadChunkReader to read chunks from it. +// +//nolint:revive // unexported-return. +func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 5 opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() @@ -460,12 +482,6 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { s1 := labels.FromStrings("l", "v1") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - appendSample := func(app storage.Appender, l labels.Labels, timestamp int64, value float64) storage.SeriesRef { - ref, err := app.Append(0, l, timestamp, value) - require.NoError(t, err) - return ref - } - t.Run("Getting a non existing chunk fails with not found error", func(t *testing.T) { db := newTestDBWithOpts(t, opts) @@ -484,7 +500,7 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT int64 queryMaxT int64 firstInOrderSampleAt int64 - inputSamples chunks.SampleSlice + inputSamples []tsValue expChunkError bool expChunksSamples []chunks.SampleSlice }{ @@ -493,9 +509,9 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - inputSamples: chunks.SampleSlice{ - sample{t: minutes(30), f: float64(0)}, - sample{t: minutes(40), f: float64(0)}, + inputSamples: []tsValue{ + {Ts: minutes(30), V: 0}, + {Ts: minutes(40), V: 0}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -504,8 +520,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // Output Graphically [--------] (With 2 samples) expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(30), f: float64(0)}, - sample{t: minutes(40), f: float64(0)}, + scenario.sampleFunc(minutes(30), 0), + scenario.sampleFunc(minutes(40), 0), }, }, }, @@ -514,19 +530,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - inputSamples: chunks.SampleSlice{ - // opts.OOOCapMax is 5 so these will be mmapped to the first mmapped chunk - sample{t: minutes(41), f: float64(0)}, - sample{t: minutes(42), f: float64(0)}, - sample{t: minutes(43), f: float64(0)}, - sample{t: minutes(44), f: float64(0)}, - sample{t: minutes(45), f: float64(0)}, - // The following samples will go to the head chunk, and we want it - // to overlap with the previous chunk - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(50), f: float64(1)}, - }, - expChunkError: false, + inputSamples: []tsValue{{Ts: minutes(41), V: 0}, {Ts: minutes(42), V: 0}, {Ts: minutes(43), V: 0}, {Ts: minutes(44), V: 0}, {Ts: minutes(45), V: 0}, {Ts: minutes(30), V: 1}, {Ts: minutes(50), V: 1}}, + expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [---] (With 5 samples) @@ -534,13 +539,13 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // Output Graphically [-----------------] (With 7 samples) expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(41), f: float64(0)}, - sample{t: minutes(42), f: float64(0)}, - sample{t: minutes(43), f: float64(0)}, - sample{t: minutes(44), f: float64(0)}, - sample{t: minutes(45), f: float64(0)}, - sample{t: minutes(50), f: float64(1)}, + scenario.sampleFunc(minutes(30), 1), + scenario.sampleFunc(minutes(41), 0), + scenario.sampleFunc(minutes(42), 0), + scenario.sampleFunc(minutes(43), 0), + scenario.sampleFunc(minutes(44), 0), + scenario.sampleFunc(minutes(45), 0), + scenario.sampleFunc(minutes(50), 1), }, }, }, @@ -549,28 +554,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - inputSamples: chunks.SampleSlice{ + inputSamples: []tsValue{ // Chunk 0 - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(12), f: float64(0)}, - sample{t: minutes(14), f: float64(0)}, - sample{t: minutes(16), f: float64(0)}, - sample{t: minutes(20), f: float64(0)}, + {Ts: minutes(10), V: 0}, + {Ts: minutes(12), V: 0}, + {Ts: minutes(14), V: 0}, + {Ts: minutes(16), V: 0}, + {Ts: minutes(20), V: 0}, // Chunk 1 - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(22), f: float64(1)}, - sample{t: minutes(24), f: float64(1)}, - sample{t: minutes(26), f: float64(1)}, - sample{t: minutes(29), f: float64(1)}, - // Chunk 2 - sample{t: minutes(30), f: float64(2)}, - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(34), f: float64(2)}, - sample{t: minutes(36), f: float64(2)}, - sample{t: minutes(40), f: float64(2)}, + {Ts: minutes(20), V: 1}, + {Ts: minutes(22), V: 1}, + {Ts: minutes(24), V: 1}, + {Ts: minutes(26), V: 1}, + {Ts: minutes(29), V: 1}, + // Chunk 3 + {Ts: minutes(30), V: 2}, + {Ts: minutes(32), V: 2}, + {Ts: minutes(34), V: 2}, + {Ts: minutes(36), V: 2}, + {Ts: minutes(40), V: 2}, // Head - sample{t: minutes(40), f: float64(3)}, - sample{t: minutes(50), f: float64(3)}, + {Ts: minutes(40), V: 3}, + {Ts: minutes(50), V: 3}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -582,23 +587,23 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // Output Graphically [----------------][-----------------] expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(12), f: float64(0)}, - sample{t: minutes(14), f: float64(0)}, - sample{t: minutes(16), f: float64(0)}, - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(22), f: float64(1)}, - sample{t: minutes(24), f: float64(1)}, - sample{t: minutes(26), f: float64(1)}, - sample{t: minutes(29), f: float64(1)}, + scenario.sampleFunc(minutes(10), 0), + scenario.sampleFunc(minutes(12), 0), + scenario.sampleFunc(minutes(14), 0), + scenario.sampleFunc(minutes(16), 0), + scenario.sampleFunc(minutes(20), 1), + scenario.sampleFunc(minutes(22), 1), + scenario.sampleFunc(minutes(24), 1), + scenario.sampleFunc(minutes(26), 1), + scenario.sampleFunc(minutes(29), 1), }, { - sample{t: minutes(30), f: float64(2)}, - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(34), f: float64(2)}, - sample{t: minutes(36), f: float64(2)}, - sample{t: minutes(40), f: float64(3)}, - sample{t: minutes(50), f: float64(3)}, + scenario.sampleFunc(minutes(30), 2), + scenario.sampleFunc(minutes(32), 2), + scenario.sampleFunc(minutes(34), 2), + scenario.sampleFunc(minutes(36), 2), + scenario.sampleFunc(minutes(40), 3), + scenario.sampleFunc(minutes(50), 3), }, }, }, @@ -607,28 +612,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - inputSamples: chunks.SampleSlice{ + inputSamples: []tsValue{ // Chunk 0 - sample{t: minutes(40), f: float64(0)}, - sample{t: minutes(42), f: float64(0)}, - sample{t: minutes(44), f: float64(0)}, - sample{t: minutes(46), f: float64(0)}, - sample{t: minutes(50), f: float64(0)}, + {Ts: minutes(40), V: 0}, + {Ts: minutes(42), V: 0}, + {Ts: minutes(44), V: 0}, + {Ts: minutes(46), V: 0}, + {Ts: minutes(50), V: 0}, // Chunk 1 - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(32), f: float64(1)}, - sample{t: minutes(34), f: float64(1)}, - sample{t: minutes(36), f: float64(1)}, - sample{t: minutes(40), f: float64(1)}, - // Chunk 2 - sample{t: minutes(20), f: float64(2)}, - sample{t: minutes(22), f: float64(2)}, - sample{t: minutes(24), f: float64(2)}, - sample{t: minutes(26), f: float64(2)}, - sample{t: minutes(29), f: float64(2)}, + {Ts: minutes(30), V: 1}, + {Ts: minutes(32), V: 1}, + {Ts: minutes(34), V: 1}, + {Ts: minutes(36), V: 1}, + {Ts: minutes(40), V: 1}, + // Chunk 3 + {Ts: minutes(20), V: 2}, + {Ts: minutes(22), V: 2}, + {Ts: minutes(24), V: 2}, + {Ts: minutes(26), V: 2}, + {Ts: minutes(29), V: 2}, // Head - sample{t: minutes(10), f: float64(3)}, - sample{t: minutes(20), f: float64(3)}, + {Ts: minutes(10), V: 3}, + {Ts: minutes(20), V: 3}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -640,23 +645,23 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // Output Graphically [----------------][-----------------] expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(10), f: float64(3)}, - sample{t: minutes(20), f: float64(2)}, - sample{t: minutes(22), f: float64(2)}, - sample{t: minutes(24), f: float64(2)}, - sample{t: minutes(26), f: float64(2)}, - sample{t: minutes(29), f: float64(2)}, + scenario.sampleFunc(minutes(10), 3), + scenario.sampleFunc(minutes(20), 2), + scenario.sampleFunc(minutes(22), 2), + scenario.sampleFunc(minutes(24), 2), + scenario.sampleFunc(minutes(26), 2), + scenario.sampleFunc(minutes(29), 2), }, { - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(32), f: float64(1)}, - sample{t: minutes(34), f: float64(1)}, - sample{t: minutes(36), f: float64(1)}, - sample{t: minutes(40), f: float64(0)}, - sample{t: minutes(42), f: float64(0)}, - sample{t: minutes(44), f: float64(0)}, - sample{t: minutes(46), f: float64(0)}, - sample{t: minutes(50), f: float64(0)}, + scenario.sampleFunc(minutes(30), 1), + scenario.sampleFunc(minutes(32), 1), + scenario.sampleFunc(minutes(34), 1), + scenario.sampleFunc(minutes(36), 1), + scenario.sampleFunc(minutes(40), 0), + scenario.sampleFunc(minutes(42), 0), + scenario.sampleFunc(minutes(44), 0), + scenario.sampleFunc(minutes(46), 0), + scenario.sampleFunc(minutes(50), 0), }, }, }, @@ -665,28 +670,28 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - inputSamples: chunks.SampleSlice{ + inputSamples: []tsValue{ // Chunk 0 - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(12), f: float64(0)}, - sample{t: minutes(14), f: float64(0)}, - sample{t: minutes(16), f: float64(0)}, - sample{t: minutes(18), f: float64(0)}, + {Ts: minutes(10), V: 0}, + {Ts: minutes(12), V: 0}, + {Ts: minutes(14), V: 0}, + {Ts: minutes(16), V: 0}, + {Ts: minutes(18), V: 0}, // Chunk 1 - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(22), f: float64(1)}, - sample{t: minutes(24), f: float64(1)}, - sample{t: minutes(26), f: float64(1)}, - sample{t: minutes(28), f: float64(1)}, - // Chunk 2 - sample{t: minutes(30), f: float64(2)}, - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(34), f: float64(2)}, - sample{t: minutes(36), f: float64(2)}, - sample{t: minutes(38), f: float64(2)}, + {Ts: minutes(20), V: 1}, + {Ts: minutes(22), V: 1}, + {Ts: minutes(24), V: 1}, + {Ts: minutes(26), V: 1}, + {Ts: minutes(28), V: 1}, + // Chunk 3 + {Ts: minutes(30), V: 2}, + {Ts: minutes(32), V: 2}, + {Ts: minutes(34), V: 2}, + {Ts: minutes(36), V: 2}, + {Ts: minutes(38), V: 2}, // Head - sample{t: minutes(40), f: float64(3)}, - sample{t: minutes(42), f: float64(3)}, + {Ts: minutes(40), V: 3}, + {Ts: minutes(42), V: 3}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -698,29 +703,29 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // Output Graphically [-------][-------][-------][--------] expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(12), f: float64(0)}, - sample{t: minutes(14), f: float64(0)}, - sample{t: minutes(16), f: float64(0)}, - sample{t: minutes(18), f: float64(0)}, + scenario.sampleFunc(minutes(10), 0), + scenario.sampleFunc(minutes(12), 0), + scenario.sampleFunc(minutes(14), 0), + scenario.sampleFunc(minutes(16), 0), + scenario.sampleFunc(minutes(18), 0), }, { - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(22), f: float64(1)}, - sample{t: minutes(24), f: float64(1)}, - sample{t: minutes(26), f: float64(1)}, - sample{t: minutes(28), f: float64(1)}, + scenario.sampleFunc(minutes(20), 1), + scenario.sampleFunc(minutes(22), 1), + scenario.sampleFunc(minutes(24), 1), + scenario.sampleFunc(minutes(26), 1), + scenario.sampleFunc(minutes(28), 1), }, { - sample{t: minutes(30), f: float64(2)}, - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(34), f: float64(2)}, - sample{t: minutes(36), f: float64(2)}, - sample{t: minutes(38), f: float64(2)}, + scenario.sampleFunc(minutes(30), 2), + scenario.sampleFunc(minutes(32), 2), + scenario.sampleFunc(minutes(34), 2), + scenario.sampleFunc(minutes(36), 2), + scenario.sampleFunc(minutes(38), 2), }, { - sample{t: minutes(40), f: float64(3)}, - sample{t: minutes(42), f: float64(3)}, + scenario.sampleFunc(minutes(40), 3), + scenario.sampleFunc(minutes(42), 3), }, }, }, @@ -729,22 +734,22 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - inputSamples: chunks.SampleSlice{ + inputSamples: []tsValue{ // Chunk 0 - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(15), f: float64(0)}, - sample{t: minutes(20), f: float64(0)}, - sample{t: minutes(25), f: float64(0)}, - sample{t: minutes(30), f: float64(0)}, + {Ts: minutes(10), V: 0}, + {Ts: minutes(15), V: 0}, + {Ts: minutes(20), V: 0}, + {Ts: minutes(25), V: 0}, + {Ts: minutes(30), V: 0}, // Chunk 1 - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(25), f: float64(1)}, - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(35), f: float64(1)}, - sample{t: minutes(42), f: float64(1)}, + {Ts: minutes(20), V: 1}, + {Ts: minutes(25), V: 1}, + {Ts: minutes(30), V: 1}, + {Ts: minutes(35), V: 1}, + {Ts: minutes(42), V: 1}, // Chunk 2 Head - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(50), f: float64(2)}, + {Ts: minutes(32), V: 2}, + {Ts: minutes(50), V: 2}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -755,15 +760,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // Output Graphically [-----------------------------------] expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(15), f: float64(0)}, - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(25), f: float64(1)}, - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(35), f: float64(1)}, - sample{t: minutes(42), f: float64(1)}, - sample{t: minutes(50), f: float64(2)}, + scenario.sampleFunc(minutes(10), 0), + scenario.sampleFunc(minutes(15), 0), + scenario.sampleFunc(minutes(20), 1), + scenario.sampleFunc(minutes(25), 1), + scenario.sampleFunc(minutes(30), 1), + scenario.sampleFunc(minutes(32), 2), + scenario.sampleFunc(minutes(35), 1), + scenario.sampleFunc(minutes(42), 1), + scenario.sampleFunc(minutes(50), 2), }, }, }, @@ -772,22 +777,22 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { queryMinT: minutes(12), queryMaxT: minutes(33), firstInOrderSampleAt: minutes(120), - inputSamples: chunks.SampleSlice{ + inputSamples: []tsValue{ // Chunk 0 - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(15), f: float64(0)}, - sample{t: minutes(20), f: float64(0)}, - sample{t: minutes(25), f: float64(0)}, - sample{t: minutes(30), f: float64(0)}, + {Ts: minutes(10), V: 0}, + {Ts: minutes(15), V: 0}, + {Ts: minutes(20), V: 0}, + {Ts: minutes(25), V: 0}, + {Ts: minutes(30), V: 0}, // Chunk 1 - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(25), f: float64(1)}, - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(35), f: float64(1)}, - sample{t: minutes(42), f: float64(1)}, + {Ts: minutes(20), V: 1}, + {Ts: minutes(25), V: 1}, + {Ts: minutes(30), V: 1}, + {Ts: minutes(35), V: 1}, + {Ts: minutes(42), V: 1}, // Chunk 2 Head - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(50), f: float64(2)}, + {Ts: minutes(32), V: 2}, + {Ts: minutes(50), V: 2}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -798,15 +803,15 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // Output Graphically [-----------------------------------] expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(10), f: float64(0)}, - sample{t: minutes(15), f: float64(0)}, - sample{t: minutes(20), f: float64(1)}, - sample{t: minutes(25), f: float64(1)}, - sample{t: minutes(30), f: float64(1)}, - sample{t: minutes(32), f: float64(2)}, - sample{t: minutes(35), f: float64(1)}, - sample{t: minutes(42), f: float64(1)}, - sample{t: minutes(50), f: float64(2)}, + scenario.sampleFunc(minutes(10), 0), + scenario.sampleFunc(minutes(15), 0), + scenario.sampleFunc(minutes(20), 1), + scenario.sampleFunc(minutes(25), 1), + scenario.sampleFunc(minutes(30), 1), + scenario.sampleFunc(minutes(32), 2), + scenario.sampleFunc(minutes(35), 1), + scenario.sampleFunc(minutes(42), 1), + scenario.sampleFunc(minutes(50), 2), }, }, }, @@ -817,13 +822,14 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { db := newTestDBWithOpts(t, opts) app := db.Appender(context.Background()) - s1Ref := appendSample(app, s1, tc.firstInOrderSampleAt, float64(tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())) + s1Ref, _, _ := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()) require.NoError(t, app.Commit()) // OOO few samples for s1. app = db.Appender(context.Background()) for _, s := range tc.inputSamples { - appendSample(app, s1, s.T(), s.F()) + _, _, err := scenario.appendFunc(app, s1, s.Ts, s.V) + require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -843,13 +849,9 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { require.NoError(t, err) require.Nil(t, c) - var resultSamples chunks.SampleSlice it := iterable.Iterator(nil) - for it.Next() == chunkenc.ValFloat { - t, v := it.At() - resultSamples = append(resultSamples, sample{t: t, f: v}) - } - require.Equal(t, tc.expChunksSamples[i], resultSamples) + resultSamples := samplesFromIterator(t, it) + compareSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) } }) } @@ -864,6 +866,24 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { // - Response B comes from : Series(), in parallel new samples added to the head, then Chunk() // - A == B func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T) { + for name, scenario := range sampleTypeScenarios { + t.Run(name, func(t *testing.T) { + testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t, scenario) + }) + } +} + +// TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding tests +// that if a query comes and performs a Series() call followed by a Chunks() call +// the response is consistent with the data seen by Series() even if the OOO +// head receives more samples before Chunks() is called. +// An example: +// - Response A comes from: Series() then Chunk() +// - Response B comes from : Series(), in parallel new samples added to the head, then Chunk() +// - A == B +// +//nolint:revive // unexported-return. +func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(t *testing.T, scenario sampleTypeScenario) { opts := DefaultOptions() opts.OutOfOrderCapMax = 5 opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() @@ -871,19 +891,13 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( s1 := labels.FromStrings("l", "v1") minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - appendSample := func(app storage.Appender, l labels.Labels, timestamp int64, value float64) storage.SeriesRef { - ref, err := app.Append(0, l, timestamp, value) - require.NoError(t, err) - return ref - } - tests := []struct { name string queryMinT int64 queryMaxT int64 firstInOrderSampleAt int64 - initialSamples chunks.SampleSlice - samplesAfterSeriesCall chunks.SampleSlice + initialSamples []tsValue + samplesAfterSeriesCall []tsValue expChunkError bool expChunksSamples []chunks.SampleSlice }{ @@ -892,21 +906,21 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - initialSamples: chunks.SampleSlice{ + initialSamples: []tsValue{ // Chunk 0 - sample{t: minutes(20), f: float64(0)}, - sample{t: minutes(22), f: float64(0)}, - sample{t: minutes(24), f: float64(0)}, - sample{t: minutes(26), f: float64(0)}, - sample{t: minutes(30), f: float64(0)}, + {Ts: minutes(20), V: 0}, + {Ts: minutes(22), V: 0}, + {Ts: minutes(24), V: 0}, + {Ts: minutes(26), V: 0}, + {Ts: minutes(30), V: 0}, // Chunk 1 Head - sample{t: minutes(25), f: float64(1)}, - sample{t: minutes(35), f: float64(1)}, + {Ts: minutes(25), V: 1}, + {Ts: minutes(35), V: 1}, }, - samplesAfterSeriesCall: chunks.SampleSlice{ - sample{t: minutes(10), f: float64(1)}, - sample{t: minutes(32), f: float64(1)}, - sample{t: minutes(50), f: float64(1)}, + samplesAfterSeriesCall: []tsValue{ + {Ts: minutes(10), V: 1}, + {Ts: minutes(32), V: 1}, + {Ts: minutes(50), V: 1}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -918,39 +932,40 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( // Output Graphically [------------] (With 8 samples, samples newer than lastmint or older than lastmaxt are omitted but the ones in between are kept) expChunksSamples: []chunks.SampleSlice{ { - sample{t: minutes(20), f: float64(0)}, - sample{t: minutes(22), f: float64(0)}, - sample{t: minutes(24), f: float64(0)}, - sample{t: minutes(25), f: float64(1)}, - sample{t: minutes(26), f: float64(0)}, - sample{t: minutes(30), f: float64(0)}, - sample{t: minutes(35), f: float64(1)}, + scenario.sampleFunc(minutes(20), 0), + scenario.sampleFunc(minutes(22), 0), + scenario.sampleFunc(minutes(24), 0), + scenario.sampleFunc(minutes(25), 1), + scenario.sampleFunc(minutes(26), 0), + scenario.sampleFunc(minutes(30), 0), + scenario.sampleFunc(minutes(32), 1), // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept + scenario.sampleFunc(minutes(35), 1), }, }, }, { - name: "After Series() previous head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in the response.", + name: "After Series() prev head gets mmapped after getting samples, new head gets new samples also overlapping, none of these should appear in response.", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), - initialSamples: chunks.SampleSlice{ + initialSamples: []tsValue{ // Chunk 0 - sample{t: minutes(20), f: float64(0)}, - sample{t: minutes(22), f: float64(0)}, - sample{t: minutes(24), f: float64(0)}, - sample{t: minutes(26), f: float64(0)}, - sample{t: minutes(30), f: float64(0)}, + {Ts: minutes(20), V: 0}, + {Ts: minutes(22), V: 0}, + {Ts: minutes(24), V: 0}, + {Ts: minutes(26), V: 0}, + {Ts: minutes(30), V: 0}, // Chunk 1 Head - sample{t: minutes(25), f: float64(1)}, - sample{t: minutes(35), f: float64(1)}, + {Ts: minutes(25), V: 1}, + {Ts: minutes(35), V: 1}, }, - samplesAfterSeriesCall: chunks.SampleSlice{ - sample{t: minutes(10), f: float64(1)}, - sample{t: minutes(32), f: float64(1)}, - sample{t: minutes(50), f: float64(1)}, + samplesAfterSeriesCall: []tsValue{ + {Ts: minutes(10), V: 1}, + {Ts: minutes(32), V: 1}, + {Ts: minutes(50), V: 1}, // Chunk 1 gets mmapped and Chunk 2, the new head is born - sample{t: minutes(25), f: float64(2)}, - sample{t: minutes(31), f: float64(2)}, + {Ts: minutes(25), V: 2}, + {Ts: minutes(31), V: 2}, }, expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 @@ -963,6 +978,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( // Output Graphically [------------] (8 samples) It has 5 from Chunk 0 and 3 from Chunk 1 expChunksSamples: []chunks.SampleSlice{ { +<<<<<<< HEAD sample{t: minutes(20), f: float64(0)}, sample{t: minutes(22), f: float64(0)}, sample{t: minutes(24), f: float64(0)}, @@ -970,6 +986,25 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( sample{t: minutes(26), f: float64(0)}, sample{t: minutes(30), f: float64(0)}, sample{t: minutes(35), f: float64(1)}, +||||||| parent of 2795db1c2 (Update tsdb tests to use test utils.) + sample{t: minutes(20), f: float64(0)}, + sample{t: minutes(22), f: float64(0)}, + sample{t: minutes(24), f: float64(0)}, + sample{t: minutes(25), f: float64(1)}, + sample{t: minutes(26), f: float64(0)}, + sample{t: minutes(30), f: float64(0)}, + sample{t: minutes(32), f: float64(1)}, // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept + sample{t: minutes(35), f: float64(1)}, +======= + scenario.sampleFunc(minutes(20), 0), + scenario.sampleFunc(minutes(22), 0), + scenario.sampleFunc(minutes(24), 0), + scenario.sampleFunc(minutes(25), 1), + scenario.sampleFunc(minutes(26), 0), + scenario.sampleFunc(minutes(30), 0), + scenario.sampleFunc(minutes(32), 1), // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept + scenario.sampleFunc(minutes(35), 1), +>>>>>>> 2795db1c2 (Update tsdb tests to use test utils.) }, }, }, @@ -980,13 +1015,15 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( db := newTestDBWithOpts(t, opts) app := db.Appender(context.Background()) - s1Ref := appendSample(app, s1, tc.firstInOrderSampleAt, float64(tc.firstInOrderSampleAt/1*time.Minute.Milliseconds())) + s1Ref, _, err := scenario.appendFunc(app, s1, tc.firstInOrderSampleAt, tc.firstInOrderSampleAt/1*time.Minute.Milliseconds()) + require.NoError(t, err) require.NoError(t, app.Commit()) // OOO few samples for s1. app = db.Appender(context.Background()) for _, s := range tc.initialSamples { - appendSample(app, s1, s.T(), s.F()) + _, _, err := scenario.appendFunc(app, s1, s.Ts, s.V) + require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -995,7 +1032,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder - err := ir.Series(s1Ref, &b, &chks) + err = ir.Series(s1Ref, &b, &chks) require.NoError(t, err) require.Equal(t, len(tc.expChunksSamples), len(chks)) @@ -1003,7 +1040,8 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( // OOO few samples for s1. app = db.Appender(context.Background()) for _, s := range tc.samplesAfterSeriesCall { - appendSample(app, s1, s.T(), s.F()) + _, _, err = scenario.appendFunc(app, s1, s.Ts, s.V) + require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -1014,13 +1052,9 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( require.NoError(t, err) require.Nil(t, c) - var resultSamples chunks.SampleSlice it := iterable.Iterator(nil) - for it.Next() == chunkenc.ValFloat { - ts, v := it.At() - resultSamples = append(resultSamples, sample{t: ts, f: v}) - } - require.Equal(t, tc.expChunksSamples[i], resultSamples) + resultSamples := samplesFromIterator(t, it) + compareSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) } }) }