From e6a682f046f946ef73c17dc2840c1906139fda90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Fri, 18 Oct 2024 08:54:37 +0200 Subject: [PATCH 1/2] Reproduce populateWithDelChunkSeriesIterator corrupting chunk meta MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When handling recoded histogram chunks the min time of the chunk is updated by mistake. It should only update when the chunk is completely new. Signed-off-by: György Krajcsovits --- tsdb/db_test.go | 64 +++++++++++++++++++++++++++++++++----- tsdb/head_test.go | 2 +- tsdb/ooo_head_read_test.go | 4 +-- tsdb/testutil.go | 33 ++++++++++++++++++-- 4 files changed, 91 insertions(+), 12 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 08417e889..3f0fc0c84 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4757,7 +4757,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) require.Len(t, seriesSet, 1) gotSamples := seriesSet[series1.String()] - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) // Verify chunks querier. chunkQuerier, err := db.ChunkQuerier(minT, maxT) @@ -4775,7 +4775,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { gotChunkSamples = append(gotChunkSamples, smpls...) require.NoError(t, it.Err()) } - requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, requireEqualSamplesIgnoreCounterResets) } var expSamples []chunks.Sample @@ -5704,16 +5704,33 @@ func testQuerierOOOQuery(t *testing.T, gotSamples := seriesSet[series1.String()] require.NotNil(t, gotSamples) require.Len(t, seriesSet, 1) - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) requireEqualOOOSamples(t, oooSamples, db) }) } } func TestChunkQuerierOOOQuery(t *testing.T) { + nBucketHistogram := func(n int64) *histogram.Histogram { + h := &histogram.Histogram{ + Count: uint64(n), + Sum: float64(n), + } + if n == 0 { + h.PositiveSpans = []histogram.Span{} + h.PositiveBuckets = []int64{} + return h + } + h.PositiveSpans = []histogram.Span{{Offset: 0, Length: uint32(n)}} + h.PositiveBuckets = make([]int64, n) + h.PositiveBuckets[0] = 1 + return h + } + scenarios := map[string]struct { - appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) - sampleFunc func(ts int64) chunks.Sample + appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) + sampleFunc func(ts int64) chunks.Sample + checkInUseBucket bool }{ "float": { appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { @@ -5758,10 +5775,24 @@ func TestChunkQuerierOOOQuery(t *testing.T) { return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))} }, }, + "integer histogram with recode": { + // Histograms have increasing number of buckets so their chunks are recoded. + appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) { + n := ts / time.Minute.Milliseconds() + return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nBucketHistogram(n), nil) + }, + sampleFunc: func(ts int64) chunks.Sample { + n := ts / time.Minute.Milliseconds() + return sample{t: ts, h: nBucketHistogram(n)} + }, + // Only check in-use buckets for this scenario. + // Recoding adds empty buckets. + checkInUseBucket: true, + }, } for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { - testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc) + testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc, scenario.checkInUseBucket) }) } } @@ -5769,6 +5800,7 @@ func TestChunkQuerierOOOQuery(t *testing.T) { func testChunkQuerierOOOQuery(t *testing.T, appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error), sampleFunc func(ts int64) chunks.Sample, + checkInUseBuckets bool, ) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -6008,10 +6040,28 @@ func testChunkQuerierOOOQuery(t *testing.T, it := chunk.Chunk.Iterator(nil) smpls, err := storage.ExpandSamples(it, newSample) require.NoError(t, err) + + // Verify that no sample is outside the chunk's time range. + for i, s := range smpls { + switch i { + case 0: + require.Equal(t, chunk.MinTime, s.T(), "first sample %v not at chunk min time %v", s, chunk.MinTime) + case len(smpls) - 1: + require.Equal(t, chunk.MaxTime, s.T(), "last sample %v not at chunk max time %v", s, chunk.MaxTime) + default: + require.GreaterOrEqual(t, s.T(), chunk.MinTime, "sample %v before chunk min time %v", s, chunk.MinTime) + require.LessOrEqual(t, s.T(), chunk.MaxTime, "sample %v after chunk max time %v", s, chunk.MaxTime) + } + } + gotSamples = append(gotSamples, smpls...) require.NoError(t, it.Err()) } - requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + if checkInUseBuckets { + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets, requireEqualSamplesInUseBucketCompare) + } else { + requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets) + } }) } } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 671e85cd7..cc9daa97f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5178,7 +5178,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { // Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers // from being factored in to the sample comparison // TODO(fionaliao): understand counter reset behaviour, might want to modify this later - requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true) + requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, requireEqualSamplesIgnoreCounterResets) require.NoError(t, h.Close()) } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 8d1527e05..17f551dd7 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -878,7 +878,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { } resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) - requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) + requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets) } }) } @@ -1054,7 +1054,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( it := iterable.Iterator(nil) resultSamples, err := storage.ExpandSamples(it, nil) require.NoError(t, err) - requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true) + requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets) } }) } diff --git a/tsdb/testutil.go b/tsdb/testutil.go index ab6aab79f..03587f4e2 100644 --- a/tsdb/testutil.go +++ b/tsdb/testutil.go @@ -111,7 +111,7 @@ func requireEqualSeries(t *testing.T, expected, actual map[string][]chunks.Sampl for name, expectedItem := range expected { actualItem, ok := actual[name] require.True(t, ok, "Expected series %s not found", name) - requireEqualSamples(t, name, expectedItem, actualItem, ignoreCounterResets) + requireEqualSamples(t, name, expectedItem, actualItem, requireEqualSamplesIgnoreCounterResets) } for name := range actual { _, ok := expected[name] @@ -126,7 +126,28 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) { "number of ooo appended samples mismatch") } -func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) { +type requireEqualSamplesOption int + +const ( + requireEqualSamplesNoOption requireEqualSamplesOption = iota + requireEqualSamplesIgnoreCounterResets + requireEqualSamplesInUseBucketCompare +) + +func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, options ...requireEqualSamplesOption) { + var ( + ignoreCounterResets bool + inUseBucketCompare bool + ) + for _, option := range options { + switch option { + case requireEqualSamplesIgnoreCounterResets: + ignoreCounterResets = true + case requireEqualSamplesInUseBucketCompare: + inUseBucketCompare = true + } + } + require.Equal(t, len(expected), len(actual), "Length not equal to expected for %s", name) for i, s := range expected { expectedSample := s @@ -144,6 +165,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa } else { require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } + if inUseBucketCompare { + expectedSample.H().Compact(0) + actualSample.H().Compact(0) + } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } case s.FH() != nil: @@ -156,6 +181,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa } else { require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint)) } + if inUseBucketCompare { + expectedSample.FH().Compact(0) + actualSample.FH().Compact(0) + } require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T()) } default: From a4083f14e866223cab66125beef5c347ae51dcf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Fri, 18 Oct 2024 09:06:37 +0200 Subject: [PATCH 2/2] Fix populateWithDelChunkSeriesIterator corrupting chunk meta MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When handling recoded histogram chunks the min time of the chunk is updated by mistake. It should only update when the chunk is completely new. Otherwise the ongoing chunk's meta will be later than the previously written samples in it. Same bug as https://github.com/prometheus/prometheus/pull/14629 Signed-off-by: György Krajcsovits --- tsdb/querier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/querier.go b/tsdb/querier.go index 1083cbba0..b80faf881 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -1022,9 +1022,9 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { if newChunk != nil { if !recoded { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) + cmint = t } currentChunk = newChunk - cmint = t } cmaxt = t