diff --git a/querier.go b/querier.go index d32cbc16a..3ee90dd6f 100644 --- a/querier.go +++ b/querier.go @@ -2,7 +2,6 @@ package tsdb import ( "fmt" - "math" "sort" "strings" @@ -605,202 +604,6 @@ func (it *chunkSeriesIterator) Err() error { return it.cur.Err() } -// BufferedSeriesIterator wraps an iterator with a look-back buffer. -type BufferedSeriesIterator struct { - it SeriesIterator - buf *sampleRing - - lastTime int64 -} - -// NewBuffer returns a new iterator that buffers the values within the time range -// of the current element and the duration of delta before. -func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { - return &BufferedSeriesIterator{ - it: it, - buf: newSampleRing(delta, 16), - lastTime: math.MinInt64, - } -} - -// PeekBack returns the previous element of the iterator. If there is none buffered, -// ok is false. -func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { - return b.buf.last() -} - -// Buffer returns an iterator over the buffered data. -func (b *BufferedSeriesIterator) Buffer() SeriesIterator { - return b.buf.iterator() -} - -// Seek advances the iterator to the element at time t or greater. -func (b *BufferedSeriesIterator) Seek(t int64) bool { - t0 := t - b.buf.delta - - // If the delta would cause us to seek backwards, preserve the buffer - // and just continue regular advancment while filling the buffer on the way. - if t0 > b.lastTime { - b.buf.reset() - - ok := b.it.Seek(t0) - if !ok { - return false - } - b.lastTime, _ = b.At() - } - - if b.lastTime >= t { - return true - } - for b.Next() { - if b.lastTime >= t { - return true - } - } - - return false -} - -// Next advances the iterator to the next element. -func (b *BufferedSeriesIterator) Next() bool { - // Add current element to buffer before advancing. - b.buf.add(b.it.At()) - - ok := b.it.Next() - if ok { - b.lastTime, _ = b.At() - } - return ok -} - -// At returns the current element of the iterator. -func (b *BufferedSeriesIterator) At() (int64, float64) { - return b.it.At() -} - -// Err returns the last encountered error. -func (b *BufferedSeriesIterator) Err() error { - return b.it.Err() -} - -type sample struct { - t int64 - v float64 -} - -type sampleRing struct { - delta int64 - - buf []sample // lookback buffer - i int // position of most recent element in ring buffer - f int // position of first element in ring buffer - l int // number of elements in buffer -} - -func newSampleRing(delta int64, sz int) *sampleRing { - r := &sampleRing{delta: delta, buf: make([]sample, sz)} - r.reset() - - return r -} - -func (r *sampleRing) reset() { - r.l = 0 - r.i = -1 - r.f = 0 -} - -func (r *sampleRing) iterator() SeriesIterator { - return &sampleRingIterator{r: r, i: -1} -} - -type sampleRingIterator struct { - r *sampleRing - i int -} - -func (it *sampleRingIterator) Next() bool { - it.i++ - return it.i < it.r.l -} - -func (it *sampleRingIterator) Seek(int64) bool { - return false -} - -func (it *sampleRingIterator) Err() error { - return nil -} - -func (it *sampleRingIterator) At() (int64, float64) { - return it.r.at(it.i) -} - -func (r *sampleRing) at(i int) (int64, float64) { - j := (r.f + i) % len(r.buf) - s := r.buf[j] - return s.t, s.v -} - -// add adds a sample to the ring buffer and frees all samples that fall -// out of the delta range. -func (r *sampleRing) add(t int64, v float64) { - l := len(r.buf) - // Grow the ring buffer if it fits no more elements. - if l == r.l { - buf := make([]sample, 2*l) - copy(buf[l+r.f:], r.buf[r.f:]) - copy(buf, r.buf[:r.f]) - - r.buf = buf - r.i = r.f - r.f += l - } else { - r.i++ - if r.i >= l { - r.i -= l - } - } - - r.buf[r.i] = sample{t: t, v: v} - r.l++ - - // Free head of the buffer of samples that just fell out of the range. - for r.buf[r.f].t < t-r.delta { - r.f++ - if r.f >= l { - r.f -= l - } - r.l-- - } -} - -// last returns the most recent element added to the ring. -func (r *sampleRing) last() (int64, float64, bool) { - if r.l == 0 { - return 0, 0, false - } - s := r.buf[r.i] - return s.t, s.v, true -} - -func (r *sampleRing) samples() []sample { - res := make([]sample, r.l) - - var k = r.f + r.l - var j int - if k > len(r.buf) { - k = len(r.buf) - j = r.l - k + r.f - } - - n := copy(res, r.buf[r.f:k]) - copy(res[n:], r.buf[:j]) - - return res -} - type mockSeriesSet struct { next func() bool series func() Series diff --git a/querier_test.go b/querier_test.go index 41fafd5ae..6330df657 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1,7 +1,6 @@ package tsdb import ( - "math/rand" "sort" "testing" @@ -201,118 +200,3 @@ func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { return r, it.Err() } - -func TestSampleRing(t *testing.T) { - cases := []struct { - input []int64 - delta int64 - size int - }{ - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 2, - size: 1, - }, - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 2, - size: 2, - }, - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 7, - size: 3, - }, - { - input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}, - delta: 7, - size: 1, - }, - } - for _, c := range cases { - r := newSampleRing(c.delta, c.size) - - input := []sample{} - for _, t := range c.input { - input = append(input, sample{ - t: t, - v: float64(rand.Intn(100)), - }) - } - - for i, s := range input { - r.add(s.t, s.v) - buffered := r.samples() - - for _, sold := range input[:i] { - found := false - for _, bs := range buffered { - if bs.t == sold.t && bs.v == sold.v { - found = true - break - } - } - if sold.t >= s.t-c.delta && !found { - t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered) - } - if sold.t < s.t-c.delta && found { - t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered) - } - } - } - } -} - -func TestBufferedSeriesIterator(t *testing.T) { - var it *BufferedSeriesIterator - - bufferEq := func(exp []sample) { - var b []sample - bit := it.Buffer() - for bit.Next() { - t, v := bit.At() - b = append(b, sample{t: t, v: v}) - } - require.Equal(t, exp, b, "buffer mismatch") - } - sampleEq := func(ets int64, ev float64) { - ts, v := it.At() - require.Equal(t, ets, ts, "timestamp mismatch") - require.Equal(t, ev, v, "value mismatch") - } - - it = NewBuffer(newListSeriesIterator([]sample{ - {t: 1, v: 2}, - {t: 2, v: 3}, - {t: 3, v: 4}, - {t: 4, v: 5}, - {t: 5, v: 6}, - {t: 99, v: 8}, - {t: 100, v: 9}, - {t: 101, v: 10}, - }), 2) - - require.True(t, it.Seek(-123), "seek failed") - sampleEq(1, 2) - bufferEq(nil) - - require.True(t, it.Next(), "next failed") - sampleEq(2, 3) - bufferEq([]sample{{t: 1, v: 2}}) - - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") - sampleEq(5, 6) - bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) - - require.True(t, it.Seek(5), "seek failed") - sampleEq(5, 6) - bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) - - require.True(t, it.Seek(101), "seek failed") - sampleEq(101, 10) - bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) - - require.False(t, it.Next(), "next succeeded unexpectedly") -} diff --git a/tsdbutil/buffer.go b/tsdbutil/buffer.go new file mode 100644 index 000000000..da6fa84d6 --- /dev/null +++ b/tsdbutil/buffer.go @@ -0,0 +1,203 @@ +package tsdbutil + +import ( + "math" + + "github.com/fabxc/tsdb" +) + +// BufferedSeriesIterator wraps an iterator with a look-back buffer. +type BufferedSeriesIterator struct { + it tsdb.SeriesIterator + buf *sampleRing + + lastTime int64 +} + +// NewBuffer returns a new iterator that buffers the values within the time range +// of the current element and the duration of delta before. +func NewBuffer(it tsdb.SeriesIterator, delta int64) *BufferedSeriesIterator { + return &BufferedSeriesIterator{ + it: it, + buf: newSampleRing(delta, 16), + lastTime: math.MinInt64, + } +} + +// PeekBack returns the previous element of the iterator. If there is none buffered, +// ok is false. +func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { + return b.buf.last() +} + +// Buffer returns an iterator over the buffered data. +func (b *BufferedSeriesIterator) Buffer() tsdb.SeriesIterator { + return b.buf.iterator() +} + +// Seek advances the iterator to the element at time t or greater. +func (b *BufferedSeriesIterator) Seek(t int64) bool { + t0 := t - b.buf.delta + + // If the delta would cause us to seek backwards, preserve the buffer + // and just continue regular advancment while filling the buffer on the way. + if t0 > b.lastTime { + b.buf.reset() + + ok := b.it.Seek(t0) + if !ok { + return false + } + b.lastTime, _ = b.At() + } + + if b.lastTime >= t { + return true + } + for b.Next() { + if b.lastTime >= t { + return true + } + } + + return false +} + +// Next advances the iterator to the next element. +func (b *BufferedSeriesIterator) Next() bool { + // Add current element to buffer before advancing. + b.buf.add(b.it.At()) + + ok := b.it.Next() + if ok { + b.lastTime, _ = b.At() + } + return ok +} + +// At returns the current element of the iterator. +func (b *BufferedSeriesIterator) At() (int64, float64) { + return b.it.At() +} + +// Err returns the last encountered error. +func (b *BufferedSeriesIterator) Err() error { + return b.it.Err() +} + +type sample struct { + t int64 + v float64 +} + +type sampleRing struct { + delta int64 + + buf []sample // lookback buffer + i int // position of most recent element in ring buffer + f int // position of first element in ring buffer + l int // number of elements in buffer +} + +func newSampleRing(delta int64, sz int) *sampleRing { + r := &sampleRing{delta: delta, buf: make([]sample, sz)} + r.reset() + + return r +} + +func (r *sampleRing) reset() { + r.l = 0 + r.i = -1 + r.f = 0 +} + +func (r *sampleRing) iterator() tsdb.SeriesIterator { + return &sampleRingIterator{r: r, i: -1} +} + +type sampleRingIterator struct { + r *sampleRing + i int +} + +func (it *sampleRingIterator) Next() bool { + it.i++ + return it.i < it.r.l +} + +func (it *sampleRingIterator) Seek(int64) bool { + return false +} + +func (it *sampleRingIterator) Err() error { + return nil +} + +func (it *sampleRingIterator) At() (int64, float64) { + return it.r.at(it.i) +} + +func (r *sampleRing) at(i int) (int64, float64) { + j := (r.f + i) % len(r.buf) + s := r.buf[j] + return s.t, s.v +} + +// add adds a sample to the ring buffer and frees all samples that fall +// out of the delta range. +func (r *sampleRing) add(t int64, v float64) { + l := len(r.buf) + // Grow the ring buffer if it fits no more elements. + if l == r.l { + buf := make([]sample, 2*l) + copy(buf[l+r.f:], r.buf[r.f:]) + copy(buf, r.buf[:r.f]) + + r.buf = buf + r.i = r.f + r.f += l + } else { + r.i++ + if r.i >= l { + r.i -= l + } + } + + r.buf[r.i] = sample{t: t, v: v} + r.l++ + + // Free head of the buffer of samples that just fell out of the range. + for r.buf[r.f].t < t-r.delta { + r.f++ + if r.f >= l { + r.f -= l + } + r.l-- + } +} + +// last returns the most recent element added to the ring. +func (r *sampleRing) last() (int64, float64, bool) { + if r.l == 0 { + return 0, 0, false + } + s := r.buf[r.i] + return s.t, s.v, true +} + +func (r *sampleRing) samples() []sample { + res := make([]sample, r.l) + + var k = r.f + r.l + var j int + if k > len(r.buf) { + k = len(r.buf) + j = r.l - k + r.f + } + + n := copy(res, r.buf[r.f:k]) + copy(res[n:], r.buf[:j]) + + return res +} diff --git a/tsdbutil/buffer_test.go b/tsdbutil/buffer_test.go new file mode 100644 index 000000000..3f3dc9b52 --- /dev/null +++ b/tsdbutil/buffer_test.go @@ -0,0 +1,160 @@ +package tsdbutil + +import ( + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSampleRing(t *testing.T) { + cases := []struct { + input []int64 + delta int64 + size int + }{ + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 2, + size: 1, + }, + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 2, + size: 2, + }, + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 7, + size: 3, + }, + { + input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}, + delta: 7, + size: 1, + }, + } + for _, c := range cases { + r := newSampleRing(c.delta, c.size) + + input := []sample{} + for _, t := range c.input { + input = append(input, sample{ + t: t, + v: float64(rand.Intn(100)), + }) + } + + for i, s := range input { + r.add(s.t, s.v) + buffered := r.samples() + + for _, sold := range input[:i] { + found := false + for _, bs := range buffered { + if bs.t == sold.t && bs.v == sold.v { + found = true + break + } + } + if sold.t >= s.t-c.delta && !found { + t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered) + } + if sold.t < s.t-c.delta && found { + t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered) + } + } + } + } +} + +func TestBufferedSeriesIterator(t *testing.T) { + var it *BufferedSeriesIterator + + bufferEq := func(exp []sample) { + var b []sample + bit := it.Buffer() + for bit.Next() { + t, v := bit.At() + b = append(b, sample{t: t, v: v}) + } + require.Equal(t, exp, b, "buffer mismatch") + } + sampleEq := func(ets int64, ev float64) { + ts, v := it.At() + require.Equal(t, ets, ts, "timestamp mismatch") + require.Equal(t, ev, v, "value mismatch") + } + + it = NewBuffer(newListSeriesIterator([]sample{ + {t: 1, v: 2}, + {t: 2, v: 3}, + {t: 3, v: 4}, + {t: 4, v: 5}, + {t: 5, v: 6}, + {t: 99, v: 8}, + {t: 100, v: 9}, + {t: 101, v: 10}, + }), 2) + + require.True(t, it.Seek(-123), "seek failed") + sampleEq(1, 2) + bufferEq(nil) + + require.True(t, it.Next(), "next failed") + sampleEq(2, 3) + bufferEq([]sample{{t: 1, v: 2}}) + + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(5), "seek failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(101), "seek failed") + sampleEq(101, 10) + bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) + + require.False(t, it.Next(), "next succeeded unexpectedly") +} + +type listSeriesIterator struct { + list []sample + idx int +} + +func newListSeriesIterator(list []sample) *listSeriesIterator { + return &listSeriesIterator{list: list, idx: -1} +} + +func (it *listSeriesIterator) At() (int64, float64) { + s := it.list[it.idx] + return s.t, s.v +} + +func (it *listSeriesIterator) Next() bool { + it.idx++ + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Seek(t int64) bool { + if it.idx == -1 { + it.idx = 0 + } + // Do binary search between current position and end. + it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { + s := it.list[i+it.idx] + return s.t >= t + }) + + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Err() error { + return nil +}