Merge pull request #540 from grafana/charleskorn/chainsampleiterator-bug

Fix issue where `chainSampleIterator` can obscure errors
This commit is contained in:
Charles Korn 2023-10-12 12:47:31 +11:00 committed by GitHub
commit f477e1b21c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 7 deletions

View file

@ -497,7 +497,12 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
c.consecutive = false
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) != chunkenc.ValNone {
if iter.Seek(t) == chunkenc.ValNone {
if iter.Err() != nil {
// If any iterator is reporting an error, abort.
return chunkenc.ValNone
}
} else {
heap.Push(&c.h, iter)
}
}
@ -571,7 +576,13 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
// So, we don't call Next() on it here.
c.curr = c.iterators[0]
for _, iter := range c.iterators[1:] {
if iter.Next() != chunkenc.ValNone {
if iter.Next() == chunkenc.ValNone {
if iter.Err() != nil {
// If any iterator is reporting an error, abort.
// If c.iterators[0] is reporting an error, we'll handle that below.
return chunkenc.ValNone
}
} else {
heap.Push(&c.h, iter)
}
}
@ -583,7 +594,17 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
for {
currValueType = c.curr.Next()
if currValueType != chunkenc.ValNone {
if currValueType == chunkenc.ValNone {
if c.curr.Err() != nil {
// Abort if we've hit an error.
return chunkenc.ValNone
} else if len(c.h) == 0 {
// No iterator left to iterate.
c.curr = nil
return chunkenc.ValNone
}
} else {
currT = c.curr.AtT()
if currT == c.lastT {
// Ignoring sample for the same timestamp.
@ -603,10 +624,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
}
// Current iterator does not hold the smallest timestamp.
heap.Push(&c.h, c.curr)
} else if len(c.h) == 0 {
// No iterator left to iterate.
c.curr = nil
return chunkenc.ValNone
}
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)

View file

@ -1144,6 +1144,35 @@ func TestChainSampleIteratorSeek(t *testing.T) {
}
}
func TestChainSampleIteratorSeekFailingIterator(t *testing.T) {
merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}),
errIterator{errors.New("something went wrong")},
})
require.Equal(t, chunkenc.ValNone, merged.Seek(0))
require.EqualError(t, merged.Err(), "something went wrong")
}
func TestChainSampleIteratorNextImmediatelyFailingIterator(t *testing.T) {
merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{
NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}),
errIterator{errors.New("something went wrong")},
})
require.Equal(t, chunkenc.ValNone, merged.Next())
require.EqualError(t, merged.Err(), "something went wrong")
// Next() does some special handling for the first iterator, so make sure it handles the first iterator returning an error too.
merged = ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{
errIterator{errors.New("something went wrong")},
NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}),
})
require.Equal(t, chunkenc.ValNone, merged.Next())
require.EqualError(t, merged.Err(), "something went wrong")
}
func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) {
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
@ -1539,3 +1568,35 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
})
}
}
type errIterator struct {
err error
}
func (e errIterator) Next() chunkenc.ValueType {
return chunkenc.ValNone
}
func (e errIterator) Seek(t int64) chunkenc.ValueType {
return chunkenc.ValNone
}
func (e errIterator) At() (int64, float64) {
return 0, 0
}
func (e errIterator) AtHistogram() (int64, *histogram.Histogram) {
return 0, nil
}
func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
return 0, nil
}
func (e errIterator) AtT() int64 {
return 0
}
func (e errIterator) Err() error {
return e.err
}