Implement proper buffered iterator

This adds a proper duration based lookback buffer for series iterators
to allow advancing sequentially while remaining able to calculate time
aggregating functions such as `rate` backwards.

It uses an array ring buffer to minimize heap allocations for
potentially hundreds of thousands of series for a single query.
This commit is contained in:
Fabian Reinartz 2016-12-14 21:14:44 +01:00
parent ca89080128
commit e561c91d53
3 changed files with 180 additions and 27 deletions

View file

@ -16,7 +16,6 @@ import (
"github.com/fabxc/tsdb"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local"
"github.com/spf13/cobra"
@ -228,7 +227,7 @@ func (c *tsdbStorage) ingestScrape(ts int64, s *tsdb.Vector) error {
}
func newTSDBStorage(path string) (*tsdbStorage, error) {
c, err := tsdb.Open(path, log.Base(), nil)
c, err := tsdb.Open(path, nil, nil)
if err != nil {
return nil, err
}

View file

@ -514,47 +514,46 @@ func (it *chunkSeriesIterator) Err() error {
return it.cur.Err()
}
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
type BufferedSeriesIterator struct {
// TODO(fabxc): time-based look back buffer for time-aggregating
// queries such as rate. It should allow us to re-use an iterator
// within a range query while calculating time-aggregates at any point.
//
// It also allows looking up/seeking at-or-before without modifying
// the simpler interface.
//
// Consider making this the main external interface.
it SeriesIterator
n int
buf []sample // lookback buffer
last sample
it SeriesIterator
buf *sampleRing
}
type sample struct {
t int64
v float64
}
func NewBufferedSeriesIterator(it SeriesIterator) *BufferedSeriesIterator {
// NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before.
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{
it: it,
it: it,
buf: newSampleRing(delta, 16),
}
}
// PeekBack returns the previous element of the iterator. If there is none buffered,
// ok is false.
func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
return b.last.t, b.last.v, true
return b.buf.last()
}
// Seek advances the iterator to the element at time t or greater.
func (b *BufferedSeriesIterator) Seek(t int64) bool {
t0 := t - 20000 // TODO(fabxc): hard-coded 20s lookback, make configurable.
tcur, _ := b.it.Values()
t0 := t - b.buf.delta
// If the delta would cause us to seek backwards, preserve the buffer
// and just continue regular advancment.
if t0 <= tcur {
return b.Next()
}
b.buf.reset()
ok := b.it.Seek(t0)
if !ok {
return false
}
b.last.t, b.last.v = b.it.Values()
b.buf.add(b.it.Values())
// TODO(fabxc): skip to relevant chunk.
for b.Next() {
if ts, _ := b.Values(); ts >= t {
return true
@ -563,16 +562,109 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool {
return false
}
// Next advances the iterator to the next element.
func (b *BufferedSeriesIterator) Next() bool {
b.last.t, b.last.v = b.it.Values()
// Add current element to buffer before advancing.
b.buf.add(b.it.Values())
return b.it.Next()
}
// Values returns the current element of the iterator.
func (b *BufferedSeriesIterator) Values() (int64, float64) {
return b.it.Values()
}
// Err returns the last encountered error.
func (b *BufferedSeriesIterator) Err() error {
return b.it.Err()
}
type sample struct {
t int64
v float64
}
type sampleRing struct {
delta int64
buf []sample // lookback buffer
i int // position of most recent element in ring buffer
f int // position of first element in ring buffer
l int // number of elements in buffer
}
func newSampleRing(delta int64, sz int) *sampleRing {
r := &sampleRing{delta: delta, buf: make([]sample, sz)}
r.reset()
return r
}
func (r *sampleRing) reset() {
r.l = 0
r.i = -1
r.f = 0
}
// add adds a sample to the ring buffer and frees all samples that fall
// out of the delta range.
func (r *sampleRing) add(t int64, v float64) {
l := len(r.buf)
// Grow the ring buffer if it fits no more elements.
if l == r.l {
buf := make([]sample, 2*l)
copy(buf[l+r.f:], r.buf[r.f:])
copy(buf, r.buf[:r.f])
r.buf = buf
r.i = r.f
r.f += l
} else {
r.i++
if r.i >= l {
r.i -= l
}
}
r.buf[r.i] = sample{t: t, v: v}
r.l++
// Free head of the buffer of samples that just fell out of the range.
for r.buf[r.f].t < t-r.delta {
r.f++
if r.f >= l {
r.f -= l
}
r.l--
}
}
// last returns the most recent element added to the ring.
func (r *sampleRing) last() (int64, float64, bool) {
if r.l == 0 {
return 0, 0, false
}
s := r.buf[r.i]
return s.t, s.v, true
}
func (r *sampleRing) samples() []sample {
res := make([]sample, 0, r.l)
var k = r.f + r.l
var j int
if k > len(r.buf) {
k = len(r.buf)
j = r.l - k + r.f
}
for _, s := range r.buf[r.f:k] {
res = append(res, s)
}
for _, s := range r.buf[:j] {
res = append(res, s)
}
return res
}

View file

@ -1,6 +1,7 @@
package tsdb
import (
"math/rand"
"testing"
"github.com/stretchr/testify/require"
@ -44,3 +45,64 @@ func TestCompareLabels(t *testing.T) {
require.Equal(t, c.res, compareLabels(a, b))
}
}
func TestSampleRing(t *testing.T) {
cases := []struct {
input []int64
delta int64
size int
}{
{
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
delta: 2,
size: 1,
},
{
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
delta: 2,
size: 2,
},
{
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
delta: 7,
size: 3,
},
{
input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20},
delta: 7,
size: 1,
},
}
for _, c := range cases {
r := newSampleRing(c.delta, c.size)
input := []sample{}
for _, t := range c.input {
input = append(input, sample{
t: t,
v: float64(rand.Intn(100)),
})
}
for i, s := range input {
r.add(s.t, s.v)
buffered := r.samples()
for _, sold := range input[:i] {
found := false
for _, bs := range buffered {
if bs.t == sold.t && bs.v == sold.v {
found = true
break
}
}
if sold.t >= s.t-c.delta && !found {
t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered)
}
if sold.t < s.t-c.delta && found {
t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered)
}
}
}
}
}