From e6a682f046f946ef73c17dc2840c1906139fda90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Fri, 18 Oct 2024 08:54:37 +0200 Subject: [PATCH] Reproduce populateWithDelChunkSeriesIterator corrupting chunk meta MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When handling recoded histogram chunks the min time of the chunk is updated by mistake. It should only update when the chunk is completely new. Signed-off-by: György Krajcsovits --- tsdb/db_test.go | 64 +++++++++++++++++++++++++++++++++----- tsdb/head_test.go | 2 +- tsdb/ooo_head_read_test.go | 4 +-- tsdb/testutil.go | 33 ++++++++++++++++++-- 4 files changed, 91 insertions(+), 12 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 08417e889..3f0fc0c84 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4757,7 +4757,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) require.Len(t, seriesSet, 1) gotSamples := seriesSet[series1.String()] - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) // Verify chunks querier. chunkQuerier, err := db.ChunkQuerier(minT, maxT) @@ -4775,7 +4775,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { gotChunkSamples = append(gotChunkSamples, smpls...) require.NoError(t, it.Err()) } - requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, requireEqualSamplesIgnoreCounterResets) } var expSamples []chunks.Sample @@ -5704,16 +5704,33 @@ func testQuerierOOOQuery(t *testing.T, gotSamples := seriesSet[series1.String()] require.NotNil(t, gotSamples) require.Len(t, seriesSet, 1) - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) requireEqualOOOSamples(t, oooSamples, db) }) } } func TestChunkQuerierOOOQuery(t *testing.T) { + nBucketHistogram := func(n int64) *histogram.Histogram { + h := &histogram.Histogram{ + Count: uint64(n), + Sum: float64(n), + } + if n == 0 { + h.PositiveSpans = []histogram.Span{} + h.PositiveBuckets = []int64{} + return h + } + h.PositiveSpans = []histogram.Span{{Offset: 0, Length: uint32(n)}} + h.PositiveBuckets = make([]int64, n) + h.PositiveBuckets[0] = 1 + return h + } + scenarios := map[string]struct { - appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) - sampleFunc func(ts int64) chunks.Sample + appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) + sampleFunc func(ts int64) chunks.Sample + checkInUseBucket bool }{ "float": { appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { @@ -5758,10 +5775,24 @@ func TestChunkQuerierOOOQuery(t *testing.T) { return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} }, }, + "integer histogram with recode": { + // Histograms have increasing number of buckets so their chunks are recoded. + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + n := ts / time.Minute.Milliseconds() + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nBucketHistogram(n), nil) + }, + sampleFunc: func(ts int64) chunks.Sample { + n := ts / time.Minute.Milliseconds() + return sample{t: ts, h: nBucketHistogram(n)} + }, + // Only check in-use buckets for this scenario. + // Recoding adds empty buckets. + checkInUseBucket: true, + }, } for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { - testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc) + testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc, scenario.checkInUseBucket) }) } } @@ -5769,6 +5800,7 @@ func TestChunkQuerierOOOQuery(t *testing.T) { func testChunkQuerierOOOQuery(t *testing.T, appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error), sampleFunc func(ts int64) chunks.Sample, + checkInUseBuckets bool, ) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -6008,10 +6040,28 @@ func testChunkQuerierOOOQuery(t *testing.T, it := chunk.Chunk.Iterator(nil) smpls, err := storage.ExpandSamples(it, newSample) require.NoError(t, err) + + // Verify that no sample is outside the chunk's time range. + for i, s := range smpls { + switch i { + case 0: + require.Equal(t, chunk.MinTime, s.T(), "first sample %v not at chunk min time %v", s, chunk.MinTime) + case len(smpls) - 1: + require.Equal(t, chunk.MaxTime, s.T(), "last sample %v not at chunk max time %v", s, chunk.MaxTime) + default: + require.GreaterOrEqual(t, s.T(), chunk.MinTime, "sample %v before chunk min time %v", s, chunk.MinTime) + require.LessOrEqual(t, s.T(), chunk.MaxTime, "sample %v after chunk max time %v", s, chunk.MaxTime) + } + } + gotSamples = append(gotSamples, smpls...) require.NoError(t, it.Err()) } - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + if checkInUseBuckets { + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets, requireEqualSamplesInUseBucketCompare) + } else { + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) + } }) } } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 671e85cd7..cc9daa97f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5178,7 +5178,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { // Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers // from being factored in to the sample comparison // TODO(fionaliao): understand counter reset behaviour, might want to modify this later - requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true) + requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, requireEqualSamplesIgnoreCounterResets) require.NoError(t, h.Close()) } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 8d1527e05..17f551dd7 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -878,7 +878,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { } resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) - requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) + requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets) } }) } @@ -1054,7 +1054,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( it := iterable.Iterator(nil) resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) - requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) + requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets) } }) } diff --git a/tsdb/testutil.go b/tsdb/testutil.go index ab6aab79f..03587f4e2 100644 --- a/tsdb/testutil.go +++ b/tsdb/testutil.go @@ -111,7 +111,7 @@ func requireEqualSeries(t *testing.T, expected, actual map[string][]chunks.Sampl for name, expectedItem := range expected { actualItem, ok := actual[name] require.True(t, ok, "Expected series %s not found", name) - requireEqualSamples(t, name, expectedItem, actualItem, ignoreCounterResets) + requireEqualSamples(t, name, expectedItem, actualItem, requireEqualSamplesIgnoreCounterResets) } for name := range actual { _, ok := expected[name] @@ -126,7 +126,28 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) { "number of ooo appended samples mismatch") } -func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) { +type requireEqualSamplesOption int + +const ( + requireEqualSamplesNoOption requireEqualSamplesOption = iota + requireEqualSamplesIgnoreCounterResets + requireEqualSamplesInUseBucketCompare +) + +func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, options ...requireEqualSamplesOption) { + var ( + ignoreCounterResets bool + inUseBucketCompare bool + ) + for _, option := range options { + switch option { + case requireEqualSamplesIgnoreCounterResets: + ignoreCounterResets = true + case requireEqualSamplesInUseBucketCompare: + inUseBucketCompare = true + } + } + require.Equal(t, len(expected), len(actual), "Length not equal to expected for %s", name) for i, s := range expected { expectedSample := s @@ -144,6 +165,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa } else { require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } + if inUseBucketCompare { + expectedSample.H().Compact(0) + actualSample.H().Compact(0) + } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } case s.FH() != nil: @@ -156,6 +181,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa } else { require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } + if inUseBucketCompare { + expectedSample.FH().Compact(0) + actualSample.FH().Compact(0) + } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } default: