Reproduce populateWithDelChunkSeriesIterator corrupting chunk meta

When handling recoded histogram chunks the min time of the chunk is
updated by mistake. It should only update when the chunk is completely
new.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-10-18 08:54:37 +02:00
parent efc43d0714
commit e6a682f046
4 changed files with 91 additions and 12 deletions

View file

@ -4757,7 +4757,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
require.Len(t, seriesSet, 1)
gotSamples := seriesSet[series1.String()]
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
// Verify chunks querier.
chunkQuerier, err := db.ChunkQuerier(minT, maxT)
@ -4775,7 +4775,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
gotChunkSamples = append(gotChunkSamples, smpls...)
require.NoError(t, it.Err())
}
requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true)
requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, requireEqualSamplesIgnoreCounterResets)
}
var expSamples []chunks.Sample
@ -5704,16 +5704,33 @@ func testQuerierOOOQuery(t *testing.T,
gotSamples := seriesSet[series1.String()]
require.NotNil(t, gotSamples)
require.Len(t, seriesSet, 1)
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
requireEqualOOOSamples(t, oooSamples, db)
})
}
}
func TestChunkQuerierOOOQuery(t *testing.T) {
nBucketHistogram := func(n int64) *histogram.Histogram {
h := &histogram.Histogram{
Count: uint64(n),
Sum: float64(n),
}
if n == 0 {
h.PositiveSpans = []histogram.Span{}
h.PositiveBuckets = []int64{}
return h
}
h.PositiveSpans = []histogram.Span{{Offset: 0, Length: uint32(n)}}
h.PositiveBuckets = make([]int64, n)
h.PositiveBuckets[0] = 1
return h
}
scenarios := map[string]struct {
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error)
sampleFunc func(ts int64) chunks.Sample
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error)
sampleFunc func(ts int64) chunks.Sample
checkInUseBucket bool
}{
"float": {
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
@ -5758,10 +5775,24 @@ func TestChunkQuerierOOOQuery(t *testing.T) {
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
},
},
"integer histogram with recode": {
// Histograms have increasing number of buckets so their chunks are recoded.
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
n := ts / time.Minute.Milliseconds()
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nBucketHistogram(n), nil)
},
sampleFunc: func(ts int64) chunks.Sample {
n := ts / time.Minute.Milliseconds()
return sample{t: ts, h: nBucketHistogram(n)}
},
// Only check in-use buckets for this scenario.
// Recoding adds empty buckets.
checkInUseBucket: true,
},
}
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc)
testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc, scenario.checkInUseBucket)
})
}
}
@ -5769,6 +5800,7 @@ func TestChunkQuerierOOOQuery(t *testing.T) {
func testChunkQuerierOOOQuery(t *testing.T,
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error),
sampleFunc func(ts int64) chunks.Sample,
checkInUseBuckets bool,
) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
@ -6008,10 +6040,28 @@ func testChunkQuerierOOOQuery(t *testing.T,
it := chunk.Chunk.Iterator(nil)
smpls, err := storage.ExpandSamples(it, newSample)
require.NoError(t, err)
// Verify that no sample is outside the chunk's time range.
for i, s := range smpls {
switch i {
case 0:
require.Equal(t, chunk.MinTime, s.T(), "first sample %v not at chunk min time %v", s, chunk.MinTime)
case len(smpls) - 1:
require.Equal(t, chunk.MaxTime, s.T(), "last sample %v not at chunk max time %v", s, chunk.MaxTime)
default:
require.GreaterOrEqual(t, s.T(), chunk.MinTime, "sample %v before chunk min time %v", s, chunk.MinTime)
require.LessOrEqual(t, s.T(), chunk.MaxTime, "sample %v after chunk max time %v", s, chunk.MaxTime)
}
}
gotSamples = append(gotSamples, smpls...)
require.NoError(t, it.Err())
}
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
if checkInUseBuckets {
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets, requireEqualSamplesInUseBucketCompare)
} else {
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}
}

View file

@ -5178,7 +5178,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
// Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers
// from being factored in to the sample comparison
// TODO(fionaliao): understand counter reset behaviour, might want to modify this later
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true)
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, requireEqualSamplesIgnoreCounterResets)
require.NoError(t, h.Close())
}

View file

@ -878,7 +878,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
}
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}
@ -1054,7 +1054,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
it := iterable.Iterator(nil)
resultSamples, err := storage.ExpandSamples(it, nil)
require.NoError(t, err)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
}
})
}

View file

@ -111,7 +111,7 @@ func requireEqualSeries(t *testing.T, expected, actual map[string][]chunks.Sampl
for name, expectedItem := range expected {
actualItem, ok := actual[name]
require.True(t, ok, "Expected series %s not found", name)
requireEqualSamples(t, name, expectedItem, actualItem, ignoreCounterResets)
requireEqualSamples(t, name, expectedItem, actualItem, requireEqualSamplesIgnoreCounterResets)
}
for name := range actual {
_, ok := expected[name]
@ -126,7 +126,28 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
"number of ooo appended samples mismatch")
}
func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) {
type requireEqualSamplesOption int
const (
requireEqualSamplesNoOption requireEqualSamplesOption = iota
requireEqualSamplesIgnoreCounterResets
requireEqualSamplesInUseBucketCompare
)
func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, options ...requireEqualSamplesOption) {
var (
ignoreCounterResets bool
inUseBucketCompare bool
)
for _, option := range options {
switch option {
case requireEqualSamplesIgnoreCounterResets:
ignoreCounterResets = true
case requireEqualSamplesInUseBucketCompare:
inUseBucketCompare = true
}
}
require.Equal(t, len(expected), len(actual), "Length not equal to expected for %s", name)
for i, s := range expected {
expectedSample := s
@ -144,6 +165,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa
} else {
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
}
if inUseBucketCompare {
expectedSample.H().Compact(0)
actualSample.H().Compact(0)
}
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
}
case s.FH() != nil:
@ -156,6 +181,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa
} else {
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
}
if inUseBucketCompare {
expectedSample.FH().Compact(0)
actualSample.FH().Compact(0)
}
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
}
default: