From f85d89abc0d1feaf7963ec249f63307e44ba4e15 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 24 Mar 2017 10:20:39 +0100 Subject: [PATCH 1/2] Move BufferedSeriesIterator in own package This functionality is useful for a lot of clients but not relevant to the TSDB's core features. --- querier.go | 197 -------------------------------------- querier_test.go | 116 ----------------------- tsdbutil/buffer.go | 203 ++++++++++++++++++++++++++++++++++++++++ tsdbutil/buffer_test.go | 160 +++++++++++++++++++++++++++++++ 4 files changed, 363 insertions(+), 313 deletions(-) create mode 100644 tsdbutil/buffer.go create mode 100644 tsdbutil/buffer_test.go diff --git a/querier.go b/querier.go index d32cbc16a..3ee90dd6f 100644 --- a/querier.go +++ b/querier.go @@ -2,7 +2,6 @@ package tsdb import ( "fmt" - "math" "sort" "strings" @@ -605,202 +604,6 @@ func (it *chunkSeriesIterator) Err() error { return it.cur.Err() } -// BufferedSeriesIterator wraps an iterator with a look-back buffer. -type BufferedSeriesIterator struct { - it SeriesIterator - buf *sampleRing - - lastTime int64 -} - -// NewBuffer returns a new iterator that buffers the values within the time range -// of the current element and the duration of delta before. -func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { - return &BufferedSeriesIterator{ - it: it, - buf: newSampleRing(delta, 16), - lastTime: math.MinInt64, - } -} - -// PeekBack returns the previous element of the iterator. If there is none buffered, -// ok is false. -func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { - return b.buf.last() -} - -// Buffer returns an iterator over the buffered data. -func (b *BufferedSeriesIterator) Buffer() SeriesIterator { - return b.buf.iterator() -} - -// Seek advances the iterator to the element at time t or greater. -func (b *BufferedSeriesIterator) Seek(t int64) bool { - t0 := t - b.buf.delta - - // If the delta would cause us to seek backwards, preserve the buffer - // and just continue regular advancment while filling the buffer on the way. - if t0 > b.lastTime { - b.buf.reset() - - ok := b.it.Seek(t0) - if !ok { - return false - } - b.lastTime, _ = b.At() - } - - if b.lastTime >= t { - return true - } - for b.Next() { - if b.lastTime >= t { - return true - } - } - - return false -} - -// Next advances the iterator to the next element. -func (b *BufferedSeriesIterator) Next() bool { - // Add current element to buffer before advancing. - b.buf.add(b.it.At()) - - ok := b.it.Next() - if ok { - b.lastTime, _ = b.At() - } - return ok -} - -// At returns the current element of the iterator. -func (b *BufferedSeriesIterator) At() (int64, float64) { - return b.it.At() -} - -// Err returns the last encountered error. -func (b *BufferedSeriesIterator) Err() error { - return b.it.Err() -} - -type sample struct { - t int64 - v float64 -} - -type sampleRing struct { - delta int64 - - buf []sample // lookback buffer - i int // position of most recent element in ring buffer - f int // position of first element in ring buffer - l int // number of elements in buffer -} - -func newSampleRing(delta int64, sz int) *sampleRing { - r := &sampleRing{delta: delta, buf: make([]sample, sz)} - r.reset() - - return r -} - -func (r *sampleRing) reset() { - r.l = 0 - r.i = -1 - r.f = 0 -} - -func (r *sampleRing) iterator() SeriesIterator { - return &sampleRingIterator{r: r, i: -1} -} - -type sampleRingIterator struct { - r *sampleRing - i int -} - -func (it *sampleRingIterator) Next() bool { - it.i++ - return it.i < it.r.l -} - -func (it *sampleRingIterator) Seek(int64) bool { - return false -} - -func (it *sampleRingIterator) Err() error { - return nil -} - -func (it *sampleRingIterator) At() (int64, float64) { - return it.r.at(it.i) -} - -func (r *sampleRing) at(i int) (int64, float64) { - j := (r.f + i) % len(r.buf) - s := r.buf[j] - return s.t, s.v -} - -// add adds a sample to the ring buffer and frees all samples that fall -// out of the delta range. -func (r *sampleRing) add(t int64, v float64) { - l := len(r.buf) - // Grow the ring buffer if it fits no more elements. - if l == r.l { - buf := make([]sample, 2*l) - copy(buf[l+r.f:], r.buf[r.f:]) - copy(buf, r.buf[:r.f]) - - r.buf = buf - r.i = r.f - r.f += l - } else { - r.i++ - if r.i >= l { - r.i -= l - } - } - - r.buf[r.i] = sample{t: t, v: v} - r.l++ - - // Free head of the buffer of samples that just fell out of the range. - for r.buf[r.f].t < t-r.delta { - r.f++ - if r.f >= l { - r.f -= l - } - r.l-- - } -} - -// last returns the most recent element added to the ring. -func (r *sampleRing) last() (int64, float64, bool) { - if r.l == 0 { - return 0, 0, false - } - s := r.buf[r.i] - return s.t, s.v, true -} - -func (r *sampleRing) samples() []sample { - res := make([]sample, r.l) - - var k = r.f + r.l - var j int - if k > len(r.buf) { - k = len(r.buf) - j = r.l - k + r.f - } - - n := copy(res, r.buf[r.f:k]) - copy(res[n:], r.buf[:j]) - - return res -} - type mockSeriesSet struct { next func() bool series func() Series diff --git a/querier_test.go b/querier_test.go index 41fafd5ae..6330df657 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1,7 +1,6 @@ package tsdb import ( - "math/rand" "sort" "testing" @@ -201,118 +200,3 @@ func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { return r, it.Err() } - -func TestSampleRing(t *testing.T) { - cases := []struct { - input []int64 - delta int64 - size int - }{ - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 2, - size: 1, - }, - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 2, - size: 2, - }, - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 7, - size: 3, - }, - { - input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}, - delta: 7, - size: 1, - }, - } - for _, c := range cases { - r := newSampleRing(c.delta, c.size) - - input := []sample{} - for _, t := range c.input { - input = append(input, sample{ - t: t, - v: float64(rand.Intn(100)), - }) - } - - for i, s := range input { - r.add(s.t, s.v) - buffered := r.samples() - - for _, sold := range input[:i] { - found := false - for _, bs := range buffered { - if bs.t == sold.t && bs.v == sold.v { - found = true - break - } - } - if sold.t >= s.t-c.delta && !found { - t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered) - } - if sold.t < s.t-c.delta && found { - t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered) - } - } - } - } -} - -func TestBufferedSeriesIterator(t *testing.T) { - var it *BufferedSeriesIterator - - bufferEq := func(exp []sample) { - var b []sample - bit := it.Buffer() - for bit.Next() { - t, v := bit.At() - b = append(b, sample{t: t, v: v}) - } - require.Equal(t, exp, b, "buffer mismatch") - } - sampleEq := func(ets int64, ev float64) { - ts, v := it.At() - require.Equal(t, ets, ts, "timestamp mismatch") - require.Equal(t, ev, v, "value mismatch") - } - - it = NewBuffer(newListSeriesIterator([]sample{ - {t: 1, v: 2}, - {t: 2, v: 3}, - {t: 3, v: 4}, - {t: 4, v: 5}, - {t: 5, v: 6}, - {t: 99, v: 8}, - {t: 100, v: 9}, - {t: 101, v: 10}, - }), 2) - - require.True(t, it.Seek(-123), "seek failed") - sampleEq(1, 2) - bufferEq(nil) - - require.True(t, it.Next(), "next failed") - sampleEq(2, 3) - bufferEq([]sample{{t: 1, v: 2}}) - - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") - sampleEq(5, 6) - bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) - - require.True(t, it.Seek(5), "seek failed") - sampleEq(5, 6) - bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) - - require.True(t, it.Seek(101), "seek failed") - sampleEq(101, 10) - bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) - - require.False(t, it.Next(), "next succeeded unexpectedly") -} diff --git a/tsdbutil/buffer.go b/tsdbutil/buffer.go new file mode 100644 index 000000000..da6fa84d6 --- /dev/null +++ b/tsdbutil/buffer.go @@ -0,0 +1,203 @@ +package tsdbutil + +import ( + "math" + + "github.com/fabxc/tsdb" +) + +// BufferedSeriesIterator wraps an iterator with a look-back buffer. +type BufferedSeriesIterator struct { + it tsdb.SeriesIterator + buf *sampleRing + + lastTime int64 +} + +// NewBuffer returns a new iterator that buffers the values within the time range +// of the current element and the duration of delta before. +func NewBuffer(it tsdb.SeriesIterator, delta int64) *BufferedSeriesIterator { + return &BufferedSeriesIterator{ + it: it, + buf: newSampleRing(delta, 16), + lastTime: math.MinInt64, + } +} + +// PeekBack returns the previous element of the iterator. If there is none buffered, +// ok is false. +func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { + return b.buf.last() +} + +// Buffer returns an iterator over the buffered data. +func (b *BufferedSeriesIterator) Buffer() tsdb.SeriesIterator { + return b.buf.iterator() +} + +// Seek advances the iterator to the element at time t or greater. +func (b *BufferedSeriesIterator) Seek(t int64) bool { + t0 := t - b.buf.delta + + // If the delta would cause us to seek backwards, preserve the buffer + // and just continue regular advancment while filling the buffer on the way. + if t0 > b.lastTime { + b.buf.reset() + + ok := b.it.Seek(t0) + if !ok { + return false + } + b.lastTime, _ = b.At() + } + + if b.lastTime >= t { + return true + } + for b.Next() { + if b.lastTime >= t { + return true + } + } + + return false +} + +// Next advances the iterator to the next element. +func (b *BufferedSeriesIterator) Next() bool { + // Add current element to buffer before advancing. + b.buf.add(b.it.At()) + + ok := b.it.Next() + if ok { + b.lastTime, _ = b.At() + } + return ok +} + +// At returns the current element of the iterator. +func (b *BufferedSeriesIterator) At() (int64, float64) { + return b.it.At() +} + +// Err returns the last encountered error. +func (b *BufferedSeriesIterator) Err() error { + return b.it.Err() +} + +type sample struct { + t int64 + v float64 +} + +type sampleRing struct { + delta int64 + + buf []sample // lookback buffer + i int // position of most recent element in ring buffer + f int // position of first element in ring buffer + l int // number of elements in buffer +} + +func newSampleRing(delta int64, sz int) *sampleRing { + r := &sampleRing{delta: delta, buf: make([]sample, sz)} + r.reset() + + return r +} + +func (r *sampleRing) reset() { + r.l = 0 + r.i = -1 + r.f = 0 +} + +func (r *sampleRing) iterator() tsdb.SeriesIterator { + return &sampleRingIterator{r: r, i: -1} +} + +type sampleRingIterator struct { + r *sampleRing + i int +} + +func (it *sampleRingIterator) Next() bool { + it.i++ + return it.i < it.r.l +} + +func (it *sampleRingIterator) Seek(int64) bool { + return false +} + +func (it *sampleRingIterator) Err() error { + return nil +} + +func (it *sampleRingIterator) At() (int64, float64) { + return it.r.at(it.i) +} + +func (r *sampleRing) at(i int) (int64, float64) { + j := (r.f + i) % len(r.buf) + s := r.buf[j] + return s.t, s.v +} + +// add adds a sample to the ring buffer and frees all samples that fall +// out of the delta range. +func (r *sampleRing) add(t int64, v float64) { + l := len(r.buf) + // Grow the ring buffer if it fits no more elements. + if l == r.l { + buf := make([]sample, 2*l) + copy(buf[l+r.f:], r.buf[r.f:]) + copy(buf, r.buf[:r.f]) + + r.buf = buf + r.i = r.f + r.f += l + } else { + r.i++ + if r.i >= l { + r.i -= l + } + } + + r.buf[r.i] = sample{t: t, v: v} + r.l++ + + // Free head of the buffer of samples that just fell out of the range. + for r.buf[r.f].t < t-r.delta { + r.f++ + if r.f >= l { + r.f -= l + } + r.l-- + } +} + +// last returns the most recent element added to the ring. +func (r *sampleRing) last() (int64, float64, bool) { + if r.l == 0 { + return 0, 0, false + } + s := r.buf[r.i] + return s.t, s.v, true +} + +func (r *sampleRing) samples() []sample { + res := make([]sample, r.l) + + var k = r.f + r.l + var j int + if k > len(r.buf) { + k = len(r.buf) + j = r.l - k + r.f + } + + n := copy(res, r.buf[r.f:k]) + copy(res[n:], r.buf[:j]) + + return res +} diff --git a/tsdbutil/buffer_test.go b/tsdbutil/buffer_test.go new file mode 100644 index 000000000..3f3dc9b52 --- /dev/null +++ b/tsdbutil/buffer_test.go @@ -0,0 +1,160 @@ +package tsdbutil + +import ( + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSampleRing(t *testing.T) { + cases := []struct { + input []int64 + delta int64 + size int + }{ + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 2, + size: 1, + }, + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 2, + size: 2, + }, + { + input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + delta: 7, + size: 3, + }, + { + input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}, + delta: 7, + size: 1, + }, + } + for _, c := range cases { + r := newSampleRing(c.delta, c.size) + + input := []sample{} + for _, t := range c.input { + input = append(input, sample{ + t: t, + v: float64(rand.Intn(100)), + }) + } + + for i, s := range input { + r.add(s.t, s.v) + buffered := r.samples() + + for _, sold := range input[:i] { + found := false + for _, bs := range buffered { + if bs.t == sold.t && bs.v == sold.v { + found = true + break + } + } + if sold.t >= s.t-c.delta && !found { + t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered) + } + if sold.t < s.t-c.delta && found { + t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered) + } + } + } + } +} + +func TestBufferedSeriesIterator(t *testing.T) { + var it *BufferedSeriesIterator + + bufferEq := func(exp []sample) { + var b []sample + bit := it.Buffer() + for bit.Next() { + t, v := bit.At() + b = append(b, sample{t: t, v: v}) + } + require.Equal(t, exp, b, "buffer mismatch") + } + sampleEq := func(ets int64, ev float64) { + ts, v := it.At() + require.Equal(t, ets, ts, "timestamp mismatch") + require.Equal(t, ev, v, "value mismatch") + } + + it = NewBuffer(newListSeriesIterator([]sample{ + {t: 1, v: 2}, + {t: 2, v: 3}, + {t: 3, v: 4}, + {t: 4, v: 5}, + {t: 5, v: 6}, + {t: 99, v: 8}, + {t: 100, v: 9}, + {t: 101, v: 10}, + }), 2) + + require.True(t, it.Seek(-123), "seek failed") + sampleEq(1, 2) + bufferEq(nil) + + require.True(t, it.Next(), "next failed") + sampleEq(2, 3) + bufferEq([]sample{{t: 1, v: 2}}) + + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(5), "seek failed") + sampleEq(5, 6) + bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) + + require.True(t, it.Seek(101), "seek failed") + sampleEq(101, 10) + bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) + + require.False(t, it.Next(), "next succeeded unexpectedly") +} + +type listSeriesIterator struct { + list []sample + idx int +} + +func newListSeriesIterator(list []sample) *listSeriesIterator { + return &listSeriesIterator{list: list, idx: -1} +} + +func (it *listSeriesIterator) At() (int64, float64) { + s := it.list[it.idx] + return s.t, s.v +} + +func (it *listSeriesIterator) Next() bool { + it.idx++ + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Seek(t int64) bool { + if it.idx == -1 { + it.idx = 0 + } + // Do binary search between current position and end. + it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { + s := it.list[i+it.idx] + return s.t >= t + }) + + return it.idx < len(it.list) +} + +func (it *listSeriesIterator) Err() error { + return nil +} From 61f866bb942d76955e3453e412d2e2a81f860176 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Sun, 26 Mar 2017 23:22:58 +0530 Subject: [PATCH 2/2] Add Sample Back The compilation and tests are broken as head.go requires sample which has been moved to another package while moving BufferedSeriesIterator. Duplication seemed better compared to exposing sample from tsdbutil. --- head.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/head.go b/head.go index 7ec2f2eae..cadc4a68b 100644 --- a/head.go +++ b/head.go @@ -34,6 +34,11 @@ var ( ErrOutOfBounds = errors.New("out of bounds") ) +type sample struct { + t int64 + v float64 +} + // headBlock handles reads and writes of time series data within a time window. type headBlock struct { mtx sync.RWMutex