diff --git a/querier.go b/querier.go index 0e0be2f6f..6d9f022a4 100644 --- a/querier.go +++ b/querier.go @@ -674,27 +674,25 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { // If the delta would cause us to seek backwards, preserve the buffer // and just continue regular advancment while filling the buffer on the way. - if t0 <= b.lastTime { - for b.Next() { - if tcur, _ := b.it.Values(); tcur >= t { - return true - } + if t0 > b.lastTime { + b.buf.reset() + + ok := b.it.Seek(t0) + if !ok { + return false } - return false + b.lastTime, _ = b.Values() } - b.buf.reset() - - ok := b.it.Seek(t0) - if !ok { - return false + if b.lastTime >= t { + return true } - for b.Next() { - if ts, _ := b.Values(); ts >= t { + if b.lastTime >= t { return true } } + return false } @@ -704,7 +702,9 @@ func (b *BufferedSeriesIterator) Next() bool { b.buf.add(b.it.Values()) ok := b.it.Next() - b.lastTime, _ = b.Values() + if ok { + b.lastTime, _ = b.Values() + } return ok } diff --git a/querier_test.go b/querier_test.go index ed2dcbf23..86d3e310c 100644 --- a/querier_test.go +++ b/querier_test.go @@ -49,11 +49,15 @@ func (it *listSeriesIterator) Next() bool { } func (it *listSeriesIterator) Seek(t int64) bool { + if it.idx == -1 { + it.idx = 0 + } // Do binary search between current position and end. it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { s := it.list[i+it.idx] return s.t >= t }) + return it.idx < len(it.list) } @@ -283,3 +287,57 @@ func TestSampleRing(t *testing.T) { } } } + +func TestBufferedSeriesIterator(t *testing.T) { + var it *BufferedSeriesIterator + + bufferEq := func(exp []sample) { + var b []sample + bit := it.Buffer() + for bit.Next() { + t, v := bit.Values() + b = append(b, sample{t: t, v: v}) + } + require.Equal(t, exp, b, "buffer mismatch") + } + sampleEq := func(ets int64, ev float64) { + ts, v := it.Values() + require.Equal(t, ets, ts, "timestamp mismatch") + require.Equal(t, ev, v, "value mismatch") + } + + it = NewBuffer(newListSeriesIterator([]sample{ + {t: 1, v: 2}, + {t: 2, v: 3}, + {t: 3, v: 4}, + {t: 4, v: 5}, + {t: 5, v: 6}, + {t: 99, v: 8}, + {t: 100, v: 9}, + {t: 101, v: 10}, + }), 2) + + require.True(t, it.Seek(-123), "seek failed") + sampleEq(1, 2) + bufferEq(nil) + + require.True(t, it.Next(), "next failed") + sampleEq(2, 3) + bufferEq([]sample{{t: 1, v: 2}}) + + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(5), "seek failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(101), "seek failed") + sampleEq(101, 10) + bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) + + require.False(t, it.Next(), "next succeeded unexpectedly") +}