mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-15 07:47:31 -08:00
Merge pull request #15180 from prometheus/ooo-nh-corrupt-chunk
fix(tsdb): populateWithDelChunkSeriesIterator corrupting chunk meta
This commit is contained in:
commit
763cbdf35f
|
@ -4757,7 +4757,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1"))
|
||||
require.Len(t, seriesSet, 1)
|
||||
gotSamples := seriesSet[series1.String()]
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
|
||||
|
||||
// Verify chunks querier.
|
||||
chunkQuerier, err := db.ChunkQuerier(minT, maxT)
|
||||
|
@ -4775,7 +4775,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
gotChunkSamples = append(gotChunkSamples, smpls...)
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true)
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, requireEqualSamplesIgnoreCounterResets)
|
||||
}
|
||||
|
||||
var expSamples []chunks.Sample
|
||||
|
@ -5704,16 +5704,33 @@ func testQuerierOOOQuery(t *testing.T,
|
|||
gotSamples := seriesSet[series1.String()]
|
||||
require.NotNil(t, gotSamples)
|
||||
require.Len(t, seriesSet, 1)
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
|
||||
requireEqualOOOSamples(t, oooSamples, db)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkQuerierOOOQuery(t *testing.T) {
|
||||
nBucketHistogram := func(n int64) *histogram.Histogram {
|
||||
h := &histogram.Histogram{
|
||||
Count: uint64(n),
|
||||
Sum: float64(n),
|
||||
}
|
||||
if n == 0 {
|
||||
h.PositiveSpans = []histogram.Span{}
|
||||
h.PositiveBuckets = []int64{}
|
||||
return h
|
||||
}
|
||||
h.PositiveSpans = []histogram.Span{{Offset: 0, Length: uint32(n)}}
|
||||
h.PositiveBuckets = make([]int64, n)
|
||||
h.PositiveBuckets[0] = 1
|
||||
return h
|
||||
}
|
||||
|
||||
scenarios := map[string]struct {
|
||||
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error)
|
||||
sampleFunc func(ts int64) chunks.Sample
|
||||
checkInUseBucket bool
|
||||
}{
|
||||
"float": {
|
||||
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||
|
@ -5758,10 +5775,24 @@ func TestChunkQuerierOOOQuery(t *testing.T) {
|
|||
return sample{t: ts, h: tsdbutil.GenerateTestHistogram(int(ts))}
|
||||
},
|
||||
},
|
||||
"integer histogram with recode": {
|
||||
// Histograms have increasing number of buckets so their chunks are recoded.
|
||||
appendFunc: func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error) {
|
||||
n := ts / time.Minute.Milliseconds()
|
||||
return app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nBucketHistogram(n), nil)
|
||||
},
|
||||
sampleFunc: func(ts int64) chunks.Sample {
|
||||
n := ts / time.Minute.Milliseconds()
|
||||
return sample{t: ts, h: nBucketHistogram(n)}
|
||||
},
|
||||
// Only check in-use buckets for this scenario.
|
||||
// Recoding adds empty buckets.
|
||||
checkInUseBucket: true,
|
||||
},
|
||||
}
|
||||
for name, scenario := range scenarios {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc)
|
||||
testChunkQuerierOOOQuery(t, scenario.appendFunc, scenario.sampleFunc, scenario.checkInUseBucket)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -5769,6 +5800,7 @@ func TestChunkQuerierOOOQuery(t *testing.T) {
|
|||
func testChunkQuerierOOOQuery(t *testing.T,
|
||||
appendFunc func(app storage.Appender, ts int64, counterReset bool) (storage.SeriesRef, error),
|
||||
sampleFunc func(ts int64) chunks.Sample,
|
||||
checkInUseBuckets bool,
|
||||
) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
|
@ -6008,10 +6040,28 @@ func testChunkQuerierOOOQuery(t *testing.T,
|
|||
it := chunk.Chunk.Iterator(nil)
|
||||
smpls, err := storage.ExpandSamples(it, newSample)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify that no sample is outside the chunk's time range.
|
||||
for i, s := range smpls {
|
||||
switch i {
|
||||
case 0:
|
||||
require.Equal(t, chunk.MinTime, s.T(), "first sample %v not at chunk min time %v", s, chunk.MinTime)
|
||||
case len(smpls) - 1:
|
||||
require.Equal(t, chunk.MaxTime, s.T(), "last sample %v not at chunk max time %v", s, chunk.MaxTime)
|
||||
default:
|
||||
require.GreaterOrEqual(t, s.T(), chunk.MinTime, "sample %v before chunk min time %v", s, chunk.MinTime)
|
||||
require.LessOrEqual(t, s.T(), chunk.MaxTime, "sample %v after chunk max time %v", s, chunk.MaxTime)
|
||||
}
|
||||
}
|
||||
|
||||
gotSamples = append(gotSamples, smpls...)
|
||||
require.NoError(t, it.Err())
|
||||
}
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotSamples, true)
|
||||
if checkInUseBuckets {
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets, requireEqualSamplesInUseBucketCompare)
|
||||
} else {
|
||||
requireEqualSamples(t, series1.String(), expSamples, gotSamples, requireEqualSamplesIgnoreCounterResets)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5178,7 +5178,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
|
|||
// Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers
|
||||
// from being factored in to the sample comparison
|
||||
// TODO(fionaliao): understand counter reset behaviour, might want to modify this later
|
||||
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true)
|
||||
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, requireEqualSamplesIgnoreCounterResets)
|
||||
|
||||
require.NoError(t, h.Close())
|
||||
}
|
||||
|
|
|
@ -878,7 +878,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
}
|
||||
resultSamples, err := storage.ExpandSamples(it, nil)
|
||||
require.NoError(t, err)
|
||||
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
|
||||
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1054,7 +1054,7 @@ func testOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
|
|||
it := iterable.Iterator(nil)
|
||||
resultSamples, err := storage.ExpandSamples(it, nil)
|
||||
require.NoError(t, err)
|
||||
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, true)
|
||||
requireEqualSamples(t, s1.String(), tc.expChunksSamples[i], resultSamples, requireEqualSamplesIgnoreCounterResets)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1022,9 +1022,9 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
|||
if newChunk != nil {
|
||||
if !recoded {
|
||||
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
|
||||
cmint = t
|
||||
}
|
||||
currentChunk = newChunk
|
||||
cmint = t
|
||||
}
|
||||
|
||||
cmaxt = t
|
||||
|
|
|
@ -111,7 +111,7 @@ func requireEqualSeries(t *testing.T, expected, actual map[string][]chunks.Sampl
|
|||
for name, expectedItem := range expected {
|
||||
actualItem, ok := actual[name]
|
||||
require.True(t, ok, "Expected series %s not found", name)
|
||||
requireEqualSamples(t, name, expectedItem, actualItem, ignoreCounterResets)
|
||||
requireEqualSamples(t, name, expectedItem, actualItem, requireEqualSamplesIgnoreCounterResets)
|
||||
}
|
||||
for name := range actual {
|
||||
_, ok := expected[name]
|
||||
|
@ -126,7 +126,28 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
|
|||
"number of ooo appended samples mismatch")
|
||||
}
|
||||
|
||||
func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, ignoreCounterResets bool) {
|
||||
type requireEqualSamplesOption int
|
||||
|
||||
const (
|
||||
requireEqualSamplesNoOption requireEqualSamplesOption = iota
|
||||
requireEqualSamplesIgnoreCounterResets
|
||||
requireEqualSamplesInUseBucketCompare
|
||||
)
|
||||
|
||||
func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sample, options ...requireEqualSamplesOption) {
|
||||
var (
|
||||
ignoreCounterResets bool
|
||||
inUseBucketCompare bool
|
||||
)
|
||||
for _, option := range options {
|
||||
switch option {
|
||||
case requireEqualSamplesIgnoreCounterResets:
|
||||
ignoreCounterResets = true
|
||||
case requireEqualSamplesInUseBucketCompare:
|
||||
inUseBucketCompare = true
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, len(expected), len(actual), "Length not equal to expected for %s", name)
|
||||
for i, s := range expected {
|
||||
expectedSample := s
|
||||
|
@ -144,6 +165,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa
|
|||
} else {
|
||||
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
|
||||
}
|
||||
if inUseBucketCompare {
|
||||
expectedSample.H().Compact(0)
|
||||
actualSample.H().Compact(0)
|
||||
}
|
||||
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
|
||||
}
|
||||
case s.FH() != nil:
|
||||
|
@ -156,6 +181,10 @@ func requireEqualSamples(t *testing.T, name string, expected, actual []chunks.Sa
|
|||
} else {
|
||||
require.Equal(t, expectedHist.CounterResetHint, actualHist.CounterResetHint, "Sample header doesn't match for %s[%d] at ts %d, expected: %s, actual: %s", name, i, expectedSample.T(), counterResetAsString(expectedHist.CounterResetHint), counterResetAsString(actualHist.CounterResetHint))
|
||||
}
|
||||
if inUseBucketCompare {
|
||||
expectedSample.FH().Compact(0)
|
||||
actualSample.FH().Compact(0)
|
||||
}
|
||||
require.Equal(t, expectedHist, actualHist, "Sample doesn't match for %s[%d] at ts %d", name, i, expectedSample.T())
|
||||
}
|
||||
default:
|
||||
|
|
Loading…
Reference in a new issue