From 5184368db6bf6c1e92c87eedc5c2617f3b4d84e4 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 1 Nov 2023 09:35:17 +1100 Subject: [PATCH] Fix issue where `chainSampleIterator` can obscure errors (#13006) * Fix issue where `chainSampleIterator` can obscure errors Signed-off-by: Charles Korn * Address PR feedback. Signed-off-by: Charles Korn --------- Signed-off-by: Charles Korn --- storage/merge.go | 35 +++++++++++++++++++------ storage/merge_test.go | 61 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 8 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index 1e29374e5..50ae88ce0 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -497,9 +497,14 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { c.consecutive = false c.h = samplesIteratorHeap{} for _, iter := range c.iterators { - if iter.Seek(t) != chunkenc.ValNone { - heap.Push(&c.h, iter) + if iter.Seek(t) == chunkenc.ValNone { + if iter.Err() != nil { + // If any iterator is reporting an error, abort. + return chunkenc.ValNone + } + continue } + heap.Push(&c.h, iter) } if len(c.h) > 0 { c.curr = heap.Pop(&c.h).(chunkenc.Iterator) @@ -571,7 +576,13 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { // So, we don't call Next() on it here. c.curr = c.iterators[0] for _, iter := range c.iterators[1:] { - if iter.Next() != chunkenc.ValNone { + if iter.Next() == chunkenc.ValNone { + if iter.Err() != nil { + // If any iterator is reporting an error, abort. + // If c.iterators[0] is reporting an error, we'll handle that below. + return chunkenc.ValNone + } + } else { heap.Push(&c.h, iter) } } @@ -583,7 +594,19 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { for { currValueType = c.curr.Next() - if currValueType != chunkenc.ValNone { + + if currValueType == chunkenc.ValNone { + if c.curr.Err() != nil { + // Abort if we've hit an error. + return chunkenc.ValNone + } + + if len(c.h) == 0 { + // No iterator left to iterate. + c.curr = nil + return chunkenc.ValNone + } + } else { currT = c.curr.AtT() if currT == c.lastT { // Ignoring sample for the same timestamp. @@ -603,10 +626,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } // Current iterator does not hold the smallest timestamp. heap.Push(&c.h, c.curr) - } else if len(c.h) == 0 { - // No iterator left to iterate. - c.curr = nil - return chunkenc.ValNone } c.curr = heap.Pop(&c.h).(chunkenc.Iterator) diff --git a/storage/merge_test.go b/storage/merge_test.go index d1b36e4fb..f68261d27 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1129,6 +1129,35 @@ func TestChainSampleIteratorSeek(t *testing.T) { } } +func TestChainSampleIteratorSeekFailingIterator(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + errIterator{errors.New("something went wrong")}, + }) + + require.Equal(t, chunkenc.ValNone, merged.Seek(0)) + require.EqualError(t, merged.Err(), "something went wrong") +} + +func TestChainSampleIteratorNextImmediatelyFailingIterator(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + errIterator{errors.New("something went wrong")}, + }) + + require.Equal(t, chunkenc.ValNone, merged.Next()) + require.EqualError(t, merged.Err(), "something went wrong") + + // Next() does some special handling for the first iterator, so make sure it handles the first iterator returning an error too. + merged = ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + errIterator{errors.New("something went wrong")}, + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + }) + + require.Equal(t, chunkenc.ValNone, merged.Next()) + require.EqualError(t, merged.Err(), "something went wrong") +} + func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) { for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{ "histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) }, @@ -1524,3 +1553,35 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }) } } + +type errIterator struct { + err error +} + +func (e errIterator) Next() chunkenc.ValueType { + return chunkenc.ValNone +} + +func (e errIterator) Seek(t int64) chunkenc.ValueType { + return chunkenc.ValNone +} + +func (e errIterator) At() (int64, float64) { + return 0, 0 +} + +func (e errIterator) AtHistogram() (int64, *histogram.Histogram) { + return 0, nil +} + +func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { + return 0, nil +} + +func (e errIterator) AtT() int64 { + return 0 +} + +func (e errIterator) Err() error { + return e.err +}