From 83e11014dd003c1e752c944d44ff3a5e0f3abff1 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Tue, 13 Sep 2022 19:36:32 +0530 Subject: [PATCH] Remove unnecessary tsdb/tsdbutil/buffer.go (#11302) Signed-off-by: Ganesh Vernekar Signed-off-by: Ganesh Vernekar --- tsdb/tsdbutil/buffer.go | 224 ----------------------------------- tsdb/tsdbutil/buffer_test.go | 180 ---------------------------- tsdb/tsdbutil/chunks.go | 13 ++ 3 files changed, 13 insertions(+), 404 deletions(-) delete mode 100644 tsdb/tsdbutil/buffer.go delete mode 100644 tsdb/tsdbutil/buffer_test.go diff --git a/tsdb/tsdbutil/buffer.go b/tsdb/tsdbutil/buffer.go deleted file mode 100644 index 3e136bb1d5..0000000000 --- a/tsdb/tsdbutil/buffer.go +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2018 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsdbutil - -import ( - "math" - - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -// BufferedSeriesIterator wraps an iterator with a look-back buffer. -type BufferedSeriesIterator struct { - it chunkenc.Iterator - buf *sampleRing - - lastTime int64 -} - -// NewBuffer returns a new iterator that buffers the values within the time range -// of the current element and the duration of delta before. -func NewBuffer(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator { - return &BufferedSeriesIterator{ - it: it, - buf: newSampleRing(delta, 16), - lastTime: math.MinInt64, - } -} - -// PeekBack returns the previous element of the iterator. If there is none buffered, -// ok is false. -func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { - return b.buf.last() -} - -// Buffer returns an iterator over the buffered data. -func (b *BufferedSeriesIterator) Buffer() chunkenc.Iterator { - return b.buf.iterator() -} - -// Seek advances the iterator to the element at time t or greater. -func (b *BufferedSeriesIterator) Seek(t int64) bool { - t0 := t - b.buf.delta - - // If the delta would cause us to seek backwards, preserve the buffer - // and just continue regular advancement while filling the buffer on the way. - if t0 > b.lastTime { - b.buf.reset() - - ok := b.it.Seek(t0) - if !ok { - return false - } - b.lastTime, _ = b.At() - } - - if b.lastTime >= t { - return true - } - for b.Next() { - if b.lastTime >= t { - return true - } - } - - return false -} - -// Next advances the iterator to the next element. -func (b *BufferedSeriesIterator) Next() bool { - // Add current element to buffer before advancing. - b.buf.add(b.it.At()) - - ok := b.it.Next() - if ok { - b.lastTime, _ = b.At() - } - return ok -} - -// At returns the current element of the iterator. -func (b *BufferedSeriesIterator) At() (int64, float64) { - return b.it.At() -} - -// Err returns the last encountered error. -func (b *BufferedSeriesIterator) Err() error { - return b.it.Err() -} - -type sample struct { - t int64 - v float64 -} - -func (s sample) T() int64 { - return s.t -} - -func (s sample) V() float64 { - return s.v -} - -type sampleRing struct { - delta int64 - - buf []sample // lookback buffer - i int // position of most recent element in ring buffer - f int // position of first element in ring buffer - l int // number of elements in buffer -} - -func newSampleRing(delta int64, sz int) *sampleRing { - r := &sampleRing{delta: delta, buf: make([]sample, sz)} - r.reset() - - return r -} - -func (r *sampleRing) reset() { - r.l = 0 - r.i = -1 - r.f = 0 -} - -func (r *sampleRing) iterator() chunkenc.Iterator { - return &sampleRingIterator{r: r, i: -1} -} - -type sampleRingIterator struct { - r *sampleRing - i int -} - -func (it *sampleRingIterator) Next() bool { - it.i++ - return it.i < it.r.l -} - -func (it *sampleRingIterator) Seek(int64) bool { - return false -} - -func (it *sampleRingIterator) Err() error { - return nil -} - -func (it *sampleRingIterator) At() (int64, float64) { - return it.r.at(it.i) -} - -func (r *sampleRing) at(i int) (int64, float64) { - j := (r.f + i) % len(r.buf) - s := r.buf[j] - return s.t, s.v -} - -// add adds a sample to the ring buffer and frees all samples that fall -// out of the delta range. -func (r *sampleRing) add(t int64, v float64) { - l := len(r.buf) - // Grow the ring buffer if it fits no more elements. - if l == r.l { - buf := make([]sample, 2*l) - copy(buf[l+r.f:], r.buf[r.f:]) - copy(buf, r.buf[:r.f]) - - r.buf = buf - r.i = r.f - r.f += l - } else { - r.i++ - if r.i >= l { - r.i -= l - } - } - - r.buf[r.i] = sample{t: t, v: v} - r.l++ - - // Free head of the buffer of samples that just fell out of the range. - for r.buf[r.f].t < t-r.delta { - r.f++ - if r.f >= l { - r.f -= l - } - r.l-- - } -} - -// last returns the most recent element added to the ring. -func (r *sampleRing) last() (int64, float64, bool) { - if r.l == 0 { - return 0, 0, false - } - s := r.buf[r.i] - return s.t, s.v, true -} - -func (r *sampleRing) samples() []sample { - res := make([]sample, r.l) - - k := r.f + r.l - var j int - if k > len(r.buf) { - k = len(r.buf) - j = r.l - k + r.f - } - - n := copy(res, r.buf[r.f:k]) - copy(res[n:], r.buf[:j]) - - return res -} diff --git a/tsdb/tsdbutil/buffer_test.go b/tsdb/tsdbutil/buffer_test.go deleted file mode 100644 index f194033607..0000000000 --- a/tsdb/tsdbutil/buffer_test.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2018 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsdbutil - -import ( - "math/rand" - "sort" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestSampleRing(t *testing.T) { - cases := []struct { - input []int64 - delta int64 - size int - }{ - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 2, - size: 1, - }, - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 2, - size: 2, - }, - { - input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - delta: 7, - size: 3, - }, - { - input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}, - delta: 7, - size: 1, - }, - } - for _, c := range cases { - r := newSampleRing(c.delta, c.size) - - input := []sample{} - for _, t := range c.input { - input = append(input, sample{ - t: t, - v: float64(rand.Intn(100)), - }) - } - - for i, s := range input { - r.add(s.t, s.v) - buffered := r.samples() - - for _, sold := range input[:i] { - found := false - for _, bs := range buffered { - if bs.t == sold.t && bs.v == sold.v { - found = true - break - } - } - if sold.t >= s.t-c.delta && !found { - t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered) - } - if sold.t < s.t-c.delta && found { - t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered) - } - } - } - } -} - -func TestBufferedSeriesIterator(t *testing.T) { - var it *BufferedSeriesIterator - - bufferEq := func(exp []sample) { - var b []sample - bit := it.Buffer() - for bit.Next() { - t, v := bit.At() - b = append(b, sample{t: t, v: v}) - } - require.Equal(t, exp, b) - } - sampleEq := func(ets int64, ev float64) { - ts, v := it.At() - require.Equal(t, ets, ts) - require.Equal(t, ev, v) - } - - it = NewBuffer(newListSeriesIterator([]sample{ - {t: 1, v: 2}, - {t: 2, v: 3}, - {t: 3, v: 4}, - {t: 4, v: 5}, - {t: 5, v: 6}, - {t: 99, v: 8}, - {t: 100, v: 9}, - {t: 101, v: 10}, - }), 2) - - require.True(t, it.Seek(-123), "seek failed") - sampleEq(1, 2) - bufferEq(nil) - - require.True(t, it.Next(), "next failed") - sampleEq(2, 3) - bufferEq([]sample{{t: 1, v: 2}}) - - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") - require.True(t, it.Next(), "next failed") - sampleEq(5, 6) - bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) - - require.True(t, it.Seek(5), "seek failed") - sampleEq(5, 6) - bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) - - require.True(t, it.Seek(101), "seek failed") - sampleEq(101, 10) - bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) - - require.False(t, it.Next(), "next succeeded unexpectedly") -} - -type listSeriesIterator struct { - list []sample - idx int -} - -func newListSeriesIterator(list []sample) *listSeriesIterator { - return &listSeriesIterator{list: list, idx: -1} -} - -func (it *listSeriesIterator) At() (int64, float64) { - s := it.list[it.idx] - return s.t, s.v -} - -func (it *listSeriesIterator) Next() bool { - it.idx++ - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Seek(t int64) bool { - if it.idx == -1 { - it.idx = 0 - } - if it.idx >= len(it.list) { - return false - } - // No-op check. - if s := it.list[it.idx]; s.T() >= t { - return true - } - // Do binary search between current position and end. - it.idx += sort.Search(len(it.list)-it.idx, func(i int) bool { - s := it.list[i+it.idx] - return s.t >= t - }) - - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Err() error { - return nil -} diff --git a/tsdb/tsdbutil/chunks.go b/tsdb/tsdbutil/chunks.go index ffe9c05e00..541d592d48 100644 --- a/tsdb/tsdbutil/chunks.go +++ b/tsdb/tsdbutil/chunks.go @@ -57,6 +57,19 @@ func ChunkFromSamplesGeneric(s Samples) chunks.Meta { } } +type sample struct { + t int64 + v float64 +} + +func (s sample) T() int64 { + return s.t +} + +func (s sample) V() float64 { + return s.v +} + // PopulatedChunk creates a chunk populated with samples every second starting at minTime func PopulatedChunk(numSamples int, minTime int64) chunks.Meta { samples := make([]Sample, numSamples)