refactor util funcs to allow re-usage. (#419)

* refactor util funcs to allow reusage.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2018-10-25 21:06:19 +01:00 committed by GitHub
parent a8351dc9d0
commit d804a27062
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 228 additions and 174 deletions

View file

@ -242,7 +242,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -469,7 +469,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -748,7 +748,7 @@ func TestTombstoneClean(t *testing.T) {
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }

View file

@ -1312,6 +1312,14 @@ type sample struct {
v float64 v float64
} }
func (s sample) T() int64 {
return s.t
}
func (s sample) V() float64 {
return s.v
}
// memSeries is the in-memory representation of a series. None of its methods // memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it. // are goroutine safe and it is the caller's responsibility to lock it.
type memSeries struct { type memSeries struct {

View file

@ -351,7 +351,7 @@ Outer:
res, err := q.Select(labels.NewEqualMatcher("a", "b")) res, err := q.Select(labels.NewEqualMatcher("a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
expSamples := make([]sample, 0, len(c.remaint)) expSamples := make([]Sample, 0, len(c.remaint))
for _, ts := range c.remaint { for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]}) expSamples = append(expSamples, sample{ts, smpls[ts]})
} }
@ -470,9 +470,9 @@ func TestDelete_e2e(t *testing.T) {
{"job", "prom-k8s"}, {"job", "prom-k8s"},
}, },
} }
seriesMap := map[string][]sample{} seriesMap := map[string][]Sample{}
for _, l := range lbls { for _, l := range lbls {
seriesMap[labels.New(l...).String()] = []sample{} seriesMap[labels.New(l...).String()] = []Sample{}
} }
dir, _ := ioutil.TempDir("", "test") dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -481,7 +481,7 @@ func TestDelete_e2e(t *testing.T) {
app := hb.Appender() app := hb.Appender()
for _, l := range lbls { for _, l := range lbls {
ls := labels.New(l...) ls := labels.New(l...)
series := []sample{} series := []Sample{}
ts := rand.Int63n(300) ts := rand.Int63n(300)
for i := 0; i < numDatapoints; i++ { for i := 0; i < numDatapoints; i++ {
v := rand.Float64() v := rand.Float64()
@ -601,12 +601,12 @@ func boundedSamples(full []sample, mint, maxt int64) []sample {
return full return full
} }
func deletedSamples(full []sample, dranges Intervals) []sample { func deletedSamples(full []Sample, dranges Intervals) []Sample {
ds := make([]sample, 0, len(full)) ds := make([]Sample, 0, len(full))
Outer: Outer:
for _, s := range full { for _, s := range full {
for _, r := range dranges { for _, r := range dranges {
if r.inBounds(s.t) { if r.inBounds(s.T()) {
continue Outer continue Outer
} }
} }

View file

@ -29,6 +29,7 @@ import (
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
) )
type mockSeriesSet struct { type mockSeriesSet struct {
@ -72,7 +73,9 @@ type mockSeries struct {
iterator func() SeriesIterator iterator func() SeriesIterator
} }
func newSeries(l map[string]string, s []sample) Series { type Sample = tsdbutil.Sample
func newSeries(l map[string]string, s []Sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) },
@ -82,17 +85,17 @@ func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() } func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
type listSeriesIterator struct { type listSeriesIterator struct {
list []sample list []Sample
idx int idx int
} }
func newListSeriesIterator(list []sample) *listSeriesIterator { func newListSeriesIterator(list []Sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1} return &listSeriesIterator{list: list, idx: -1}
} }
func (it *listSeriesIterator) At() (int64, float64) { func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx] s := it.list[it.idx]
return s.t, s.v return s.T(), s.V()
} }
func (it *listSeriesIterator) Next() bool { func (it *listSeriesIterator) Next() bool {
@ -107,7 +110,7 @@ func (it *listSeriesIterator) Seek(t int64) bool {
// Do binary search between current position and end. // Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
s := it.list[i+it.idx] s := it.list[i+it.idx]
return s.t >= t return s.T() >= t
}) })
return it.idx < len(it.list) return it.idx < len(it.list)
@ -131,33 +134,33 @@ func TestMergedSeriesSet(t *testing.T) {
a: newMockSeriesSet([]Series{ a: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []sample{ }, []Sample{
{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
b: newMockSeriesSet([]Series{ b: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []sample{ }, []Sample{
{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, []sample{ }, []Sample{
{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
exp: newMockSeriesSet([]Series{ exp: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, []sample{ }, []Sample{
{t: 1, v: 1}, sample{t: 1, v: 1},
{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, []sample{ }, []Sample{
{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
}), }),
}, },
@ -166,49 +169,49 @@ func TestMergedSeriesSet(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []sample{ }, []Sample{
{t: 1, v: 1}, sample{t: 1, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []sample{ }, []Sample{
{t: 1, v: 2}, sample{t: 1, v: 2},
}), }),
}), }),
b: newMockSeriesSet([]Series{ b: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []sample{ }, []Sample{
{t: 2, v: 1}, sample{t: 2, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "query", "handler": "query",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []sample{ }, []Sample{
{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
}), }),
exp: newMockSeriesSet([]Series{ exp: newMockSeriesSet([]Series{
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "127.0.0.1:9090", "instance": "127.0.0.1:9090",
}, []sample{ }, []Sample{
{t: 1, v: 1}, sample{t: 1, v: 1},
{t: 2, v: 1}, sample{t: 2, v: 1},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "prometheus", "handler": "prometheus",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []sample{ }, []Sample{
{t: 1, v: 2}, sample{t: 1, v: 2},
}), }),
newSeries(map[string]string{ newSeries(map[string]string{
"handler": "query", "handler": "query",
"instance": "localhost:9090", "instance": "localhost:9090",
}, []sample{ }, []Sample{
{t: 2, v: 2}, sample{t: 2, v: 2},
}), }),
}), }),
}, },
@ -313,7 +316,7 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) {
} }
func TestBlockQuerier(t *testing.T) { func TestBlockQuerier(t *testing.T) {
newSeries := func(l map[string]string, s []sample) Series { newSeries := func(l map[string]string, s []Sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) },
@ -442,13 +445,13 @@ func TestBlockQuerier(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, },
[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}}, []Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}}, []Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}},
), ),
}), }),
}, },
@ -461,13 +464,13 @@ func TestBlockQuerier(t *testing.T) {
"a": "ab", "a": "ab",
"p": "abce", "p": "abce",
}, },
[]sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}}, []Sample{sample{2, 2}, sample{3, 3}, sample{5, 3}, sample{6, 6}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"p": "abcd", "p": "abcd",
"x": "xyz", "x": "xyz",
}, },
[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}}, []Sample{sample{2, 3}, sample{3, 4}, sample{5, 2}, sample{6, 3}},
), ),
}), }),
}, },
@ -513,7 +516,7 @@ Outer:
} }
func TestBlockQuerierDelete(t *testing.T) { func TestBlockQuerierDelete(t *testing.T) {
newSeries := func(l map[string]string, s []sample) Series { newSeries := func(l map[string]string, s []Sample) Series {
return &mockSeries{ return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) }, labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) }, iterator: func() SeriesIterator { return newListSeriesIterator(s) },
@ -588,13 +591,13 @@ func TestBlockQuerierDelete(t *testing.T) {
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
}, },
[]sample{{5, 2}, {6, 3}, {7, 4}}, []Sample{sample{5, 2}, sample{6, 3}, sample{7, 4}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]sample{{4, 15}, {5, 3}}, []Sample{sample{4, 15}, sample{5, 3}},
), ),
}), }),
}, },
@ -607,12 +610,12 @@ func TestBlockQuerierDelete(t *testing.T) {
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]sample{{4, 15}, {5, 3}}, []Sample{sample{4, 15}, sample{5, 3}},
), ),
newSeries(map[string]string{ newSeries(map[string]string{
"b": "b", "b": "b",
}, },
[]sample{{2, 2}, {3, 6}, {5, 1}}, []Sample{sample{2, 2}, sample{3, 6}, sample{5, 1}},
), ),
}), }),
}, },
@ -625,7 +628,7 @@ func TestBlockQuerierDelete(t *testing.T) {
"a": "a", "a": "a",
"b": "b", "b": "b",
}, },
[]sample{{4, 15}}, []Sample{sample{4, 15}},
), ),
}), }),
}, },
@ -782,86 +785,69 @@ type itSeries struct {
func (s itSeries) Iterator() SeriesIterator { return s.si } func (s itSeries) Iterator() SeriesIterator { return s.si }
func (s itSeries) Labels() labels.Labels { return labels.Labels{} } func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func chunkFromSamples(s []sample) chunks.Meta {
mint, maxt := int64(0), int64(0)
if len(s) > 0 {
mint, maxt = s[0].t, s[len(s)-1].t
}
c := chunkenc.NewXORChunk()
ca, _ := c.Appender()
for _, s := range s {
ca.Append(s.t, s.v)
}
return chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: c,
}
}
func TestSeriesIterator(t *testing.T) { func TestSeriesIterator(t *testing.T) {
itcases := []struct { itcases := []struct {
a, b, c []sample a, b, c []Sample
exp []sample exp []Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []sample{}, a: []Sample{},
b: []sample{}, b: []Sample{},
c: []sample{}, c: []Sample{},
exp: []sample{}, exp: []Sample{},
mint: math.MinInt64, mint: math.MinInt64,
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []sample{ a: []Sample{
{1, 2}, {2, 3}, {3, 5}, {6, 1}, sample{1, 2},
sample{2, 3},
sample{3, 5},
sample{6, 1},
}, },
b: []sample{}, b: []Sample{},
c: []sample{ c: []Sample{
{7, 89}, {9, 8}, sample{7, 89}, sample{9, 8},
}, },
exp: []sample{ exp: []Sample{
{1, 2}, {2, 3}, {3, 5}, {6, 1}, {7, 89}, {9, 8}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: math.MinInt64, mint: math.MinInt64,
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []sample{}, a: []Sample{},
b: []sample{ b: []Sample{
{1, 2}, {2, 3}, {3, 5}, {6, 1}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1},
}, },
c: []sample{ c: []Sample{
{7, 89}, {9, 8}, sample{7, 89}, sample{9, 8},
}, },
exp: []sample{ exp: []Sample{
{1, 2}, {2, 3}, {3, 5}, {6, 1}, {7, 89}, {9, 8}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: 2, mint: 2,
maxt: 8, maxt: 8,
}, },
{ {
a: []sample{ a: []Sample{
{1, 2}, {2, 3}, {3, 5}, {6, 1}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1},
}, },
b: []sample{ b: []Sample{
{7, 89}, {9, 8}, sample{7, 89}, sample{9, 8},
}, },
c: []sample{ c: []Sample{
{10, 22}, {203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
exp: []sample{ exp: []Sample{
{1, 2}, {2, 3}, {3, 5}, {6, 1}, {7, 89}, {9, 8}, {10, 22}, {203, 3493}, sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 22}, sample{203, 3493},
}, },
mint: 6, mint: 6,
maxt: 10, maxt: 10,
@ -869,30 +855,30 @@ func TestSeriesIterator(t *testing.T) {
} }
seekcases := []struct { seekcases := []struct {
a, b, c []sample a, b, c []Sample
seek int64 seek int64
success bool success bool
exp []sample exp []Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []sample{}, a: []Sample{},
b: []sample{}, b: []Sample{},
c: []sample{}, c: []Sample{},
seek: 0, seek: 0,
success: false, success: false,
exp: nil, exp: nil,
}, },
{ {
a: []sample{ a: []Sample{
{2, 3}, sample{2, 3},
}, },
b: []sample{}, b: []Sample{},
c: []sample{ c: []Sample{
{7, 89}, {9, 8}, sample{7, 89}, sample{9, 8},
}, },
seek: 10, seek: 10,
@ -902,56 +888,56 @@ func TestSeriesIterator(t *testing.T) {
maxt: math.MaxInt64, maxt: math.MaxInt64,
}, },
{ {
a: []sample{}, a: []Sample{},
b: []sample{ b: []Sample{
{1, 2}, {3, 5}, {6, 1}, sample{1, 2}, sample{3, 5}, sample{6, 1},
}, },
c: []sample{ c: []Sample{
{7, 89}, {9, 8}, sample{7, 89}, sample{9, 8},
}, },
seek: 2, seek: 2,
success: true, success: true,
exp: []sample{ exp: []Sample{
{3, 5}, {6, 1}, {7, 89}, {9, 8}, sample{3, 5}, sample{6, 1}, sample{7, 89}, sample{9, 8},
}, },
mint: 5, mint: 5,
maxt: 8, maxt: 8,
}, },
{ {
a: []sample{ a: []Sample{
{6, 1}, sample{6, 1},
}, },
b: []sample{ b: []Sample{
{9, 8}, sample{9, 8},
}, },
c: []sample{ c: []Sample{
{10, 22}, {203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 10, seek: 10,
success: true, success: true,
exp: []sample{ exp: []Sample{
{10, 22}, {203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
mint: 10, mint: 10,
maxt: 203, maxt: 203,
}, },
{ {
a: []sample{ a: []Sample{
{6, 1}, sample{6, 1},
}, },
b: []sample{ b: []Sample{
{9, 8}, sample{9, 8},
}, },
c: []sample{ c: []Sample{
{10, 22}, {203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 203, seek: 203,
success: true, success: true,
exp: []sample{ exp: []Sample{
{203, 3493}, sample{203, 3493},
}, },
mint: 7, mint: 7,
maxt: 203, maxt: 203,
@ -961,16 +947,16 @@ func TestSeriesIterator(t *testing.T) {
t.Run("Chunk", func(t *testing.T) { t.Run("Chunk", func(t *testing.T) {
for _, tc := range itcases { for _, tc := range itcases {
chkMetas := []chunks.Meta{ chkMetas := []chunks.Meta{
chunkFromSamples(tc.a), tsdbutil.ChunkFromSamples(tc.a),
chunkFromSamples(tc.b), tsdbutil.ChunkFromSamples(tc.b),
chunkFromSamples(tc.c), tsdbutil.ChunkFromSamples(tc.c),
} }
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
smplValid := make([]sample, 0) smplValid := make([]Sample, 0)
for _, s := range tc.exp { for _, s := range tc.exp {
if s.t >= tc.mint && s.t <= tc.maxt { if s.T() >= tc.mint && s.T() <= tc.maxt {
smplValid = append(smplValid, s) smplValid = append(smplValid, Sample(s))
} }
} }
exp := newListSeriesIterator(smplValid) exp := newListSeriesIterator(smplValid)
@ -984,23 +970,23 @@ func TestSeriesIterator(t *testing.T) {
t.Run("Seek", func(t *testing.T) { t.Run("Seek", func(t *testing.T) {
extra := []struct { extra := []struct {
a, b, c []sample a, b, c []Sample
seek int64 seek int64
success bool success bool
exp []sample exp []Sample
mint, maxt int64 mint, maxt int64
}{ }{
{ {
a: []sample{ a: []Sample{
{6, 1}, sample{6, 1},
}, },
b: []sample{ b: []Sample{
{9, 8}, sample{9, 8},
}, },
c: []sample{ c: []Sample{
{10, 22}, {203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 203, seek: 203,
@ -1010,19 +996,19 @@ func TestSeriesIterator(t *testing.T) {
maxt: 202, maxt: 202,
}, },
{ {
a: []sample{ a: []Sample{
{6, 1}, sample{6, 1},
}, },
b: []sample{ b: []Sample{
{9, 8}, sample{9, 8},
}, },
c: []sample{ c: []Sample{
{10, 22}, {203, 3493}, sample{10, 22}, sample{203, 3493},
}, },
seek: 5, seek: 5,
success: true, success: true,
exp: []sample{{10, 22}}, exp: []Sample{sample{10, 22}},
mint: 10, mint: 10,
maxt: 202, maxt: 202,
}, },
@ -1032,16 +1018,16 @@ func TestSeriesIterator(t *testing.T) {
for _, tc := range seekcases2 { for _, tc := range seekcases2 {
chkMetas := []chunks.Meta{ chkMetas := []chunks.Meta{
chunkFromSamples(tc.a), tsdbutil.ChunkFromSamples(tc.a),
chunkFromSamples(tc.b), tsdbutil.ChunkFromSamples(tc.b),
chunkFromSamples(tc.c), tsdbutil.ChunkFromSamples(tc.c),
} }
res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt) res := newChunkSeriesIterator(chkMetas, nil, tc.mint, tc.maxt)
smplValid := make([]sample, 0) smplValid := make([]Sample, 0)
for _, s := range tc.exp { for _, s := range tc.exp {
if s.t >= tc.mint && s.t <= tc.maxt { if s.T() >= tc.mint && s.T() <= tc.maxt {
smplValid = append(smplValid, s) smplValid = append(smplValid, Sample(s))
} }
} }
exp := newListSeriesIterator(smplValid) exp := newListSeriesIterator(smplValid)
@ -1074,7 +1060,7 @@ func TestSeriesIterator(t *testing.T) {
itSeries{newListSeriesIterator(tc.c)} itSeries{newListSeriesIterator(tc.c)}
res := newChainedSeriesIterator(a, b, c) res := newChainedSeriesIterator(a, b, c)
exp := newListSeriesIterator(tc.exp) exp := newListSeriesIterator([]Sample(tc.exp))
smplExp, errExp := expandSeriesIterator(exp) smplExp, errExp := expandSeriesIterator(exp)
smplRes, errRes := expandSeriesIterator(res) smplRes, errRes := expandSeriesIterator(res)
@ -1119,9 +1105,9 @@ func TestSeriesIterator(t *testing.T) {
// Regression for: https://github.com/prometheus/tsdb/pull/97 // Regression for: https://github.com/prometheus/tsdb/pull/97
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
chkMetas := []chunks.Meta{ chkMetas := []chunks.Meta{
chunkFromSamples([]sample{}), tsdbutil.ChunkFromSamples([]Sample{}),
chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}), tsdbutil.ChunkFromSamples([]Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
chunkFromSamples([]sample{{4, 4}, {5, 5}}), tsdbutil.ChunkFromSamples([]Sample{sample{4, 4}, sample{5, 5}}),
} }
res := newChunkSeriesIterator(chkMetas, nil, 2, 8) res := newChunkSeriesIterator(chkMetas, nil, 2, 8)
@ -1136,9 +1122,9 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
// skipped to the end when seeking a value in the current chunk. // skipped to the end when seeking a value in the current chunk.
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
metas := []chunks.Meta{ metas := []chunks.Meta{
chunkFromSamples([]sample{}), tsdbutil.ChunkFromSamples([]Sample{}),
chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}), tsdbutil.ChunkFromSamples([]Sample{sample{1, 2}, sample{3, 4}, sample{5, 6}, sample{7, 8}}),
chunkFromSamples([]sample{}), tsdbutil.ChunkFromSamples([]Sample{}),
} }
it := newChunkSeriesIterator(metas, nil, 1, 7) it := newChunkSeriesIterator(metas, nil, 1, 7)
@ -1158,7 +1144,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
// Seek gets called and advances beyond the max time, which was just accepted as a valid sample. // Seek gets called and advances beyond the max time, which was just accepted as a valid sample.
func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) {
metas := []chunks.Meta{ metas := []chunks.Meta{
chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}), tsdbutil.ChunkFromSamples([]Sample{sample{1, 6}, sample{5, 6}, sample{7, 8}}),
} }
it := newChunkSeriesIterator(metas, nil, 2, 4) it := newChunkSeriesIterator(metas, nil, 2, 4)

View file

@ -2,13 +2,25 @@ package tsdbutil
import ( import (
"math" "math"
"github.com/prometheus/tsdb"
) )
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}
// BufferedSeriesIterator wraps an iterator with a look-back buffer. // BufferedSeriesIterator wraps an iterator with a look-back buffer.
type BufferedSeriesIterator struct { type BufferedSeriesIterator struct {
it tsdb.SeriesIterator it SeriesIterator
buf *sampleRing buf *sampleRing
lastTime int64 lastTime int64
@ -16,7 +28,7 @@ type BufferedSeriesIterator struct {
// NewBuffer returns a new iterator that buffers the values within the time range // NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before. // of the current element and the duration of delta before.
func NewBuffer(it tsdb.SeriesIterator, delta int64) *BufferedSeriesIterator { func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{ return &BufferedSeriesIterator{
it: it, it: it,
buf: newSampleRing(delta, 16), buf: newSampleRing(delta, 16),
@ -31,7 +43,7 @@ func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
} }
// Buffer returns an iterator over the buffered data. // Buffer returns an iterator over the buffered data.
func (b *BufferedSeriesIterator) Buffer() tsdb.SeriesIterator { func (b *BufferedSeriesIterator) Buffer() SeriesIterator {
return b.buf.iterator() return b.buf.iterator()
} }
@ -90,6 +102,14 @@ type sample struct {
v float64 v float64
} }
func (s sample) T() int64 {
return s.t
}
func (s sample) V() float64 {
return s.v
}
type sampleRing struct { type sampleRing struct {
delta int64 delta int64
@ -112,7 +132,7 @@ func (r *sampleRing) reset() {
r.f = 0 r.f = 0
} }
func (r *sampleRing) iterator() tsdb.SeriesIterator { func (r *sampleRing) iterator() SeriesIterator {
return &sampleRingIterator{r: r, i: -1} return &sampleRingIterator{r: r, i: -1}
} }

40
tsdbutil/chunks.go Normal file
View file

@ -0,0 +1,40 @@
package tsdbutil
import (
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
)
type Sample interface {
T() int64
V() float64
}
func ChunkFromSamples(s []Sample) chunks.Meta {
mint, maxt := int64(0), int64(0)
if len(s) > 0 {
mint, maxt = s[0].T(), s[len(s)-1].T()
}
c := chunkenc.NewXORChunk()
ca, _ := c.Appender()
for _, s := range s {
ca.Append(s.T(), s.V())
}
return chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: c,
}
}
// PopulatedChunk creates a chunk populated with samples every second starting at minTime
func PopulatedChunk(numSamples int, minTime int64) chunks.Meta {
samples := make([]Sample, numSamples)
for i := 0; i < numSamples; i++ {
samples[i] = sample{minTime + int64(i*1000), 1.0}
}
return ChunkFromSamples(samples)
}