mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Fix issue where chainSampleIterator
can obscure errors (#13006)
* Fix issue where `chainSampleIterator` can obscure errors Signed-off-by: Charles Korn <charles.korn@grafana.com> * Address PR feedback. Signed-off-by: Charles Korn <charles.korn@grafana.com> --------- Signed-off-by: Charles Korn <charles.korn@grafana.com>
This commit is contained in:
parent
12dd0cb53b
commit
5184368db6
|
@ -497,9 +497,14 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
|
||||||
c.consecutive = false
|
c.consecutive = false
|
||||||
c.h = samplesIteratorHeap{}
|
c.h = samplesIteratorHeap{}
|
||||||
for _, iter := range c.iterators {
|
for _, iter := range c.iterators {
|
||||||
if iter.Seek(t) != chunkenc.ValNone {
|
if iter.Seek(t) == chunkenc.ValNone {
|
||||||
heap.Push(&c.h, iter)
|
if iter.Err() != nil {
|
||||||
|
// If any iterator is reporting an error, abort.
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
heap.Push(&c.h, iter)
|
||||||
}
|
}
|
||||||
if len(c.h) > 0 {
|
if len(c.h) > 0 {
|
||||||
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
||||||
|
@ -571,7 +576,13 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
||||||
// So, we don't call Next() on it here.
|
// So, we don't call Next() on it here.
|
||||||
c.curr = c.iterators[0]
|
c.curr = c.iterators[0]
|
||||||
for _, iter := range c.iterators[1:] {
|
for _, iter := range c.iterators[1:] {
|
||||||
if iter.Next() != chunkenc.ValNone {
|
if iter.Next() == chunkenc.ValNone {
|
||||||
|
if iter.Err() != nil {
|
||||||
|
// If any iterator is reporting an error, abort.
|
||||||
|
// If c.iterators[0] is reporting an error, we'll handle that below.
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
|
} else {
|
||||||
heap.Push(&c.h, iter)
|
heap.Push(&c.h, iter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -583,7 +594,19 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
currValueType = c.curr.Next()
|
currValueType = c.curr.Next()
|
||||||
if currValueType != chunkenc.ValNone {
|
|
||||||
|
if currValueType == chunkenc.ValNone {
|
||||||
|
if c.curr.Err() != nil {
|
||||||
|
// Abort if we've hit an error.
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(c.h) == 0 {
|
||||||
|
// No iterator left to iterate.
|
||||||
|
c.curr = nil
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
|
} else {
|
||||||
currT = c.curr.AtT()
|
currT = c.curr.AtT()
|
||||||
if currT == c.lastT {
|
if currT == c.lastT {
|
||||||
// Ignoring sample for the same timestamp.
|
// Ignoring sample for the same timestamp.
|
||||||
|
@ -603,10 +626,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
|
||||||
}
|
}
|
||||||
// Current iterator does not hold the smallest timestamp.
|
// Current iterator does not hold the smallest timestamp.
|
||||||
heap.Push(&c.h, c.curr)
|
heap.Push(&c.h, c.curr)
|
||||||
} else if len(c.h) == 0 {
|
|
||||||
// No iterator left to iterate.
|
|
||||||
c.curr = nil
|
|
||||||
return chunkenc.ValNone
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
||||||
|
|
|
@ -1129,6 +1129,35 @@ func TestChainSampleIteratorSeek(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestChainSampleIteratorSeekFailingIterator(t *testing.T) {
|
||||||
|
merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}),
|
||||||
|
errIterator{errors.New("something went wrong")},
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, chunkenc.ValNone, merged.Seek(0))
|
||||||
|
require.EqualError(t, merged.Err(), "something went wrong")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChainSampleIteratorNextImmediatelyFailingIterator(t *testing.T) {
|
||||||
|
merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{
|
||||||
|
NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}),
|
||||||
|
errIterator{errors.New("something went wrong")},
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, chunkenc.ValNone, merged.Next())
|
||||||
|
require.EqualError(t, merged.Err(), "something went wrong")
|
||||||
|
|
||||||
|
// Next() does some special handling for the first iterator, so make sure it handles the first iterator returning an error too.
|
||||||
|
merged = ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{
|
||||||
|
errIterator{errors.New("something went wrong")},
|
||||||
|
NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}),
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, chunkenc.ValNone, merged.Next())
|
||||||
|
require.EqualError(t, merged.Err(), "something went wrong")
|
||||||
|
}
|
||||||
|
|
||||||
func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) {
|
func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) {
|
||||||
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
|
for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{
|
||||||
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
|
"histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) },
|
||||||
|
@ -1524,3 +1553,35 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type errIterator struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIterator) Next() chunkenc.ValueType {
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIterator) Seek(t int64) chunkenc.ValueType {
|
||||||
|
return chunkenc.ValNone
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIterator) At() (int64, float64) {
|
||||||
|
return 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIterator) AtHistogram() (int64, *histogram.Histogram) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIterator) AtT() int64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e errIterator) Err() error {
|
||||||
|
return e.err
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue