From 6976e003cb066ba4aaf8e6794e3d49951e1bd792 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 11 Oct 2023 15:16:17 +1100 Subject: [PATCH 1/3] Fix issue where consumers of `chainSampleIterator.Seek()` can ignore errors Signed-off-by: Charles Korn --- storage/merge.go | 7 ++++++- storage/merge_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/storage/merge.go b/storage/merge.go index 452c891437..0b7b33078c 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -497,7 +497,12 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { c.consecutive = false c.h = samplesIteratorHeap{} for _, iter := range c.iterators { - if iter.Seek(t) != chunkenc.ValNone { + if iter.Seek(t) == chunkenc.ValNone { + if iter.Err() != nil { + // If any iterator is reporting an error, abort. + return chunkenc.ValNone + } + } else { heap.Push(&c.h, iter) } } diff --git a/storage/merge_test.go b/storage/merge_test.go index 4252852376..0f61a906c3 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1144,6 +1144,16 @@ func TestChainSampleIteratorSeek(t *testing.T) { } } +func TestChainSampleIteratorSeekFailingIterator(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + errIterator{errors.New("something went wrong")}, + }) + + require.Equal(t, chunkenc.ValNone, merged.Seek(0)) + require.EqualError(t, merged.Err(), "something went wrong") +} + func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) { for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{ "histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) }, @@ -1539,3 +1549,35 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }) } } + +type errIterator struct { + err error +} + +func (e errIterator) Next() chunkenc.ValueType { + return chunkenc.ValNone +} + +func (e errIterator) Seek(t int64) chunkenc.ValueType { + return chunkenc.ValNone +} + +func (e errIterator) At() (int64, float64) { + return 0, 0 +} + +func (e errIterator) AtHistogram() (int64, *histogram.Histogram) { + return 0, nil +} + +func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { + return 0, nil +} + +func (e errIterator) AtT() int64 { + return 0 +} + +func (e errIterator) Err() error { + return e.err +} From 4bf8ec202c3b0305b222bc711634112b96d2e577 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 12 Oct 2023 11:57:39 +1100 Subject: [PATCH 2/3] Fix similar issue for Next() as well. Signed-off-by: Charles Korn --- storage/merge.go | 11 ++++++++++- storage/merge_test.go | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/storage/merge.go b/storage/merge.go index 0b7b33078c..43a7f5d334 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -576,7 +576,13 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { // So, we don't call Next() on it here. c.curr = c.iterators[0] for _, iter := range c.iterators[1:] { - if iter.Next() != chunkenc.ValNone { + if iter.Next() == chunkenc.ValNone { + if iter.Err() != nil { + // If any iterator is reporting an error, abort. + // If c.iterators[0] is reporting an error, we'll handle that below. + return chunkenc.ValNone + } + } else { heap.Push(&c.h, iter) } } @@ -608,6 +614,9 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } // Current iterator does not hold the smallest timestamp. heap.Push(&c.h, c.curr) + } else if c.curr.Err() != nil { + // Abort if we've hit an error. + return chunkenc.ValNone } else if len(c.h) == 0 { // No iterator left to iterate. c.curr = nil diff --git a/storage/merge_test.go b/storage/merge_test.go index 0f61a906c3..0564913a8e 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1154,6 +1154,25 @@ func TestChainSampleIteratorSeekFailingIterator(t *testing.T) { require.EqualError(t, merged.Err(), "something went wrong") } +func TestChainSampleIteratorNextImmediatelyFailingIterator(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + errIterator{errors.New("something went wrong")}, + }) + + require.Equal(t, chunkenc.ValNone, merged.Next()) + require.EqualError(t, merged.Err(), "something went wrong") + + // Next() does some special handling for the first iterator, so make sure it handles the first iterator returning an error too. + merged = ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + errIterator{errors.New("something went wrong")}, + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + }) + + require.Equal(t, chunkenc.ValNone, merged.Next()) + require.EqualError(t, merged.Err(), "something went wrong") +} + func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) { for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{ "histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) }, From 6c45249fb50d700e8da79cf1a4511ab4ffa1f7ba Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 12 Oct 2023 12:29:44 +1100 Subject: [PATCH 3/3] Make the linter happy --- storage/merge.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index 43a7f5d334..0db63c992c 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -594,7 +594,17 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { for { currValueType = c.curr.Next() - if currValueType != chunkenc.ValNone { + + if currValueType == chunkenc.ValNone { + if c.curr.Err() != nil { + // Abort if we've hit an error. + return chunkenc.ValNone + } else if len(c.h) == 0 { + // No iterator left to iterate. + c.curr = nil + return chunkenc.ValNone + } + } else { currT = c.curr.AtT() if currT == c.lastT { // Ignoring sample for the same timestamp. @@ -614,13 +624,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } // Current iterator does not hold the smallest timestamp. heap.Push(&c.h, c.curr) - } else if c.curr.Err() != nil { - // Abort if we've hit an error. - return chunkenc.ValNone - } else if len(c.h) == 0 { - // No iterator left to iterate. - c.curr = nil - return chunkenc.ValNone } c.curr = heap.Pop(&c.h).(chunkenc.Iterator)