diff --git a/chunks_test.go b/chunks_test.go index d5cacd9f3..ae9d69876 100644 --- a/chunks_test.go +++ b/chunks_test.go @@ -13,17 +13,22 @@ package tsdb -import "github.com/prometheus/tsdb/chunks" +import ( + "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" +) -type mockChunkReader struct { - chunk func(ref uint64) (chunks.Chunk, error) - close func() error +type mockChunkReader map[uint64]chunks.Chunk + +func (cr mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { + chk, ok := cr[ref] + if ok { + return chk, nil + } + + return nil, errors.New("Chunk with ref not found") } -func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - return cr.chunk(ref) -} - -func (cr *mockChunkReader) Close() error { - return cr.close() +func (cr mockChunkReader) Close() error { + return nil } diff --git a/index_test.go b/index_test.go index 3a5240979..3522ca68e 100644 --- a/index_test.go +++ b/index_test.go @@ -277,6 +277,14 @@ func TestPersistence_index_e2e(t *testing.T) { } i++ } + + for k, v := range values { + vals := v.slice() + + require.NoError(t, iw.WriteLabelIndex([]string{k}, vals)) + require.NoError(t, mi.WriteLabelIndex([]string{k}, vals)) + } + all := make([]uint32, len(lbls)) for i := range all { all[i] = uint32(i) @@ -319,6 +327,24 @@ func TestPersistence_index_e2e(t *testing.T) { require.NoError(t, gotp.Err()) } - require.NoError(t, ir.Close()) + for k, v := range mi.labelIndex { + tplsExp, err := newStringTuples(v, 1) + require.NoError(t, err) + tplsRes, err := ir.LabelValues(k) + require.NoError(t, err) + + require.Equal(t, tplsExp.Len(), tplsRes.Len()) + for i := 0; i < tplsExp.Len(); i++ { + strsExp, err := tplsExp.At(i) + require.NoError(t, err) + + strsRes, err := tplsRes.At(i) + require.NoError(t, err) + + require.Equal(t, strsExp, strsRes) + } + } + + require.NoError(t, ir.Close()) } diff --git a/postings.go b/postings.go index 47a8ad077..180ac099d 100644 --- a/postings.go +++ b/postings.go @@ -152,30 +152,30 @@ func Merge(its ...Postings) Postings { a := its[0] for _, b := range its[1:] { - a = newMergePostings(a, b) + a = newMergedPostings(a, b) } return a } -type mergePostings struct { +type mergedPostings struct { a, b Postings aok, bok bool cur uint32 } -func newMergePostings(a, b Postings) *mergePostings { - it := &mergePostings{a: a, b: b} +func newMergedPostings(a, b Postings) *mergedPostings { + it := &mergedPostings{a: a, b: b} it.aok = it.a.Next() it.bok = it.b.Next() return it } -func (it *mergePostings) At() uint32 { +func (it *mergedPostings) At() uint32 { return it.cur } -func (it *mergePostings) Next() bool { +func (it *mergedPostings) Next() bool { if !it.aok && !it.bok { return false } @@ -210,13 +210,14 @@ func (it *mergePostings) Next() bool { return true } -func (it *mergePostings) Seek(id uint32) bool { +func (it *mergedPostings) Seek(id uint32) bool { it.aok = it.a.Seek(id) it.bok = it.b.Seek(id) + return it.Next() } -func (it *mergePostings) Err() error { +func (it *mergedPostings) Err() error { if it.a.Err() != nil { return it.a.Err() } diff --git a/postings_test.go b/postings_test.go index f690aaf74..d9154cab6 100644 --- a/postings_test.go +++ b/postings_test.go @@ -16,7 +16,6 @@ package tsdb import ( "encoding/binary" "math/rand" - "reflect" "testing" "github.com/stretchr/testify/require" @@ -60,17 +59,13 @@ func TestIntersect(t *testing.T) { }, } - for i, c := range cases { + for _, c := range cases { a := newListPostings(c.a) b := newListPostings(c.b) res, err := expandPostings(Intersect(a, b)) - if err != nil { - t.Fatalf("%d: Unexpected error: %s", i, err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("%d: Expected %v but got %v", i, c.res, res) - } + require.NoError(t, err) + require.Equal(t, c.res, res) } } @@ -93,12 +88,9 @@ func TestMultiIntersect(t *testing.T) { pc := newListPostings(c.c) res, err := expandPostings(Intersect(pa, pb, pc)) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) - } + + require.NoError(t, err) + require.Equal(t, c.res, res) } } @@ -154,16 +146,12 @@ func TestMultiMerge(t *testing.T) { i3 := newListPostings(c.c) res, err := expandPostings(Merge(i1, i2, i3)) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) - } + require.NoError(t, err) + require.Equal(t, c.res, res) } } -func TestMerge(t *testing.T) { +func TestMergedPostings(t *testing.T) { var cases = []struct { a, b []uint32 res []uint32 @@ -189,14 +177,78 @@ func TestMerge(t *testing.T) { a := newListPostings(c.a) b := newListPostings(c.b) - res, err := expandPostings(newMergePostings(a, b)) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) + res, err := expandPostings(newMergedPostings(a, b)) + require.NoError(t, err) + require.Equal(t, c.res, res) + } + +} + +func TestMergedPostingsSeek(t *testing.T) { + var cases = []struct { + a, b []uint32 + + seek uint32 + success bool + res []uint32 + }{ + { + a: []uint32{1, 2, 3, 4, 5}, + b: []uint32{6, 7, 8, 9, 10}, + + seek: 0, + success: true, + res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + { + a: []uint32{1, 2, 3, 4, 5}, + b: []uint32{6, 7, 8, 9, 10}, + + seek: 2, + success: true, + res: []uint32{2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + { + a: []uint32{1, 2, 3, 4, 5}, + b: []uint32{4, 5, 6, 7, 8}, + + seek: 9, + success: false, + res: nil, + }, + { + a: []uint32{1, 2, 3, 4, 9, 10}, + b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, + + seek: 10, + success: true, + res: []uint32{10, 11}, + }, + } + + for _, c := range cases { + a := newListPostings(c.a) + b := newListPostings(c.b) + + p := newMergedPostings(a, b) + + require.Equal(t, c.success, p.Seek(c.seek)) + + if c.success { + // check the current element and then proceed to check the rest. + i := 0 + require.Equal(t, c.res[i], p.At()) + + for p.Next() { + i++ + require.Equal(t, int(c.res[i]), int(p.At())) + } + + require.Equal(t, len(c.res)-1, i) } } + + return } func TestBigEndian(t *testing.T) { diff --git a/querier.go b/querier.go index 830da35b1..da3cf74ca 100644 --- a/querier.go +++ b/querier.go @@ -166,6 +166,9 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { mint: q.mint, maxt: q.maxt, }, + + mint: q.mint, + maxt: q.maxt, } } @@ -444,12 +447,14 @@ type blockSeriesSet struct { set chunkSeriesSet err error cur Series + + mint, maxt int64 } func (s *blockSeriesSet) Next() bool { for s.set.Next() { lset, chunks := s.set.At() - s.cur = &chunkSeries{labels: lset, chunks: chunks} + s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt} return true } if s.set.Err() != nil { @@ -466,6 +471,8 @@ func (s *blockSeriesSet) Err() error { return s.err } type chunkSeries struct { labels labels.Labels chunks []*ChunkMeta // in-order chunk refs + + mint, maxt int64 } func (s *chunkSeries) Labels() labels.Labels { @@ -473,14 +480,14 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - return newChunkSeriesIterator(s.chunks) + return newChunkSeriesIterator(s.chunks, s.mint, s.maxt) } // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. - // If there's no value exactly at ts, it advances to the last value - // before tt. + // If there's no value exactly at t, it advances to the first value + // after t. Seek(t int64) bool // At returns the current timestamp/value pair. At() (t int64, v float64) @@ -501,7 +508,7 @@ func (s *chainedSeries) Labels() labels.Labels { } func (s *chainedSeries) Iterator() SeriesIterator { - return &chainedSeriesIterator{series: s.series} + return newChainedSeriesIterator(s.series...) } // chainedSeriesIterator implements a series iterater over a list @@ -513,6 +520,14 @@ type chainedSeriesIterator struct { cur SeriesIterator } +func newChainedSeriesIterator(s ...Series) *chainedSeriesIterator { + return &chainedSeriesIterator{ + series: s, + i: 0, + cur: s[0].Iterator(), + } +} + func (it *chainedSeriesIterator) Seek(t int64) bool { // We just scan the chained series sequentially as they are already // pre-selected by relevant time and should be accessed sequentially anyway. @@ -529,9 +544,6 @@ func (it *chainedSeriesIterator) Seek(t int64) bool { } func (it *chainedSeriesIterator) Next() bool { - if it.cur == nil { - it.cur = it.series[it.i].Iterator() - } if it.cur.Next() { return true } @@ -563,17 +575,35 @@ type chunkSeriesIterator struct { i int cur chunks.Iterator + + maxt, mint int64 } -func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator { return &chunkSeriesIterator{ chunks: cs, i: 0, cur: cs[0].Chunk.Iterator(), + + mint: mint, + maxt: maxt, } } +func (it *chunkSeriesIterator) inBounds(t int64) bool { + return t >= it.mint && t <= it.maxt +} + func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { + if t > it.maxt { + return false + } + + // Seek to the first valid value after t. + if t < it.mint { + t = it.mint + } + // Only do binary search forward to stay in line with other iterators // that can only move forward. x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) @@ -582,10 +612,10 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { // If the timestamp was not found, it might be in the last chunk. if x == len(it.chunks) { x-- - } - // Go to previous chunk if the chunk doesn't exactly start with t. - // If we are already at the first chunk, we use it as it's the best we have. - if x > 0 && it.chunks[x].MinTime > t { + + // Go to previous chunk if the chunk doesn't exactly start with t. + // If we are already at the first chunk, we use it as it's the best we have. + } else if x > 0 && it.chunks[x].MinTime > t { x-- } @@ -606,9 +636,13 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) { } func (it *chunkSeriesIterator) Next() bool { - if it.cur.Next() { - return true + for it.cur.Next() { + t, _ := it.cur.At() + if it.inBounds(t) { + return true + } } + if err := it.cur.Err(); err != nil { return false } diff --git a/querier_test.go b/querier_test.go index d97015884..f5b0336ff 100644 --- a/querier_test.go +++ b/querier_test.go @@ -14,9 +14,12 @@ package tsdb import ( + "math" + "math/rand" "sort" "testing" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -213,3 +216,638 @@ func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { return r, it.Err() } + +// Index: labels -> postings -> chunkMetas -> chunkRef +// ChunkReader: ref -> vals +func createIdxChkReaders(tc []struct { + lset map[string]string + chunks [][]sample +}) (IndexReader, ChunkReader) { + sort.Slice(tc, func(i, j int) bool { + return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 + }) + + postings := &memPostings{m: make(map[term][]uint32, 512)} + chkReader := mockChunkReader(make(map[uint64]chunks.Chunk)) + mi := newMockIndex() + + for i, s := range tc { + metas := make([]*ChunkMeta, 0, len(s.chunks)) + for _, chk := range s.chunks { + // Collisions can be there, but for tests, its fine. + ref := rand.Uint64() + + metas = append(metas, &ChunkMeta{ + MinTime: chk[0].t, + MaxTime: chk[len(chk)-1].t, + Ref: ref, + }) + + chunk := chunks.NewXORChunk() + app, _ := chunk.Appender() + for _, smpl := range chk { + app.Append(smpl.t, smpl.v) + } + chkReader[ref] = chunk + } + + mi.AddSeries(uint32(i), labels.FromMap(s.lset), metas...) + + postings.add(uint32(i), term{}) + for _, l := range labels.FromMap(s.lset) { + postings.add(uint32(i), term{l.Name, l.Value}) + } + } + + for tm := range postings.m { + mi.WritePostings(tm.name, tm.name, postings.get(tm)) + } + + return mi, chkReader +} + +func TestBlockQuerier(t *testing.T) { + newSeries := func(l map[string]string, s []sample) Series { + return &mockSeries{ + labels: func() labels.Labels { return labels.FromMap(l) }, + iterator: func() SeriesIterator { return newListSeriesIterator(s) }, + } + } + + type query struct { + mint, maxt int64 + ms []labels.Matcher + exp SeriesSet + } + + cases := struct { + data []struct { + lset map[string]string + chunks [][]sample + } + + queries []query + }{ + data: []struct { + lset map[string]string + chunks [][]sample + }{ + { + lset: map[string]string{ + "a": "a", + }, + chunks: [][]sample{ + { + {1, 2}, {2, 3}, {3, 4}, + }, + { + {5, 2}, {6, 3}, {7, 4}, + }, + }, + }, + { + lset: map[string]string{ + "a": "a", + "b": "b", + }, + chunks: [][]sample{ + { + {1, 1}, {2, 2}, {3, 3}, + }, + { + {5, 3}, {6, 6}, + }, + }, + }, + { + lset: map[string]string{ + "b": "b", + }, + chunks: [][]sample{ + { + {1, 3}, {2, 2}, {3, 6}, + }, + { + {5, 1}, {6, 7}, {7, 2}, + }, + }, + }, + }, + + queries: []query{ + { + mint: 0, + maxt: 0, + ms: []labels.Matcher{}, + exp: newListSeriesSet([]Series{}), + }, + { + mint: 0, + maxt: 0, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{}), + }, + { + mint: 1, + maxt: 0, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{}), + }, + { + mint: 2, + maxt: 6, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, + []sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}}, + ), + newSeries(map[string]string{ + "a": "a", + "b": "b", + }, + []sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}}, + ), + }), + }, + }, + } + +Outer: + for _, c := range cases.queries { + ir, cr := createIdxChkReaders(cases.data) + querier := &blockQuerier{ + index: ir, + chunks: cr, + + mint: c.mint, + maxt: c.maxt, + } + + res := querier.Select(c.ms...) + + for { + eok, rok := c.exp.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + continue Outer + } + sexp := c.exp.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } + + return +} + +func TestBaseChunkSeries(t *testing.T) { + type refdSeries struct { + lset labels.Labels + chunks []*ChunkMeta + + ref uint32 + } + + cases := []struct { + series []refdSeries + // Postings should be in the sorted order of the the series + postings []uint32 + + expIdxs []int + }{ + { + series: []refdSeries{ + { + lset: labels.New([]labels.Label{{"a", "a"}}...), + chunks: []*ChunkMeta{ + {Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344}, + {Ref: 121}, + }, + ref: 12, + }, + { + lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), + chunks: []*ChunkMeta{ + {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, + }, + ref: 10, + }, + { + lset: labels.New([]labels.Label{{"b", "c"}}...), + chunks: []*ChunkMeta{{Ref: 8282}}, + ref: 1, + }, + { + lset: labels.New([]labels.Label{{"b", "b"}}...), + chunks: []*ChunkMeta{ + {Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269}, + }, + ref: 108, + }, + }, + postings: []uint32{12, 10, 108}, + + expIdxs: []int{0, 1, 3}, + }, + { + series: []refdSeries{ + { + lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), + chunks: []*ChunkMeta{ + {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, + }, + ref: 10, + }, + { + lset: labels.New([]labels.Label{{"b", "c"}}...), + chunks: []*ChunkMeta{{Ref: 8282}}, + ref: 1, + }, + }, + postings: []uint32{}, + + expIdxs: []int{}, + }, + } + + for _, tc := range cases { + mi := newMockIndex() + for _, s := range tc.series { + mi.AddSeries(s.ref, s.lset, s.chunks...) + } + + bcs := &baseChunkSeries{ + p: newListPostings(tc.postings), + index: mi, + } + + i := 0 + for bcs.Next() { + lset, chks := bcs.At() + + idx := tc.expIdxs[i] + + require.Equal(t, tc.series[idx].lset, lset) + require.Equal(t, tc.series[idx].chunks, chks) + + i++ + } + require.Equal(t, len(tc.expIdxs), i) + require.NoError(t, bcs.Err()) + } + + return +} + +// TODO: Remove after simpleSeries is merged +type itSeries struct { + si SeriesIterator +} + +func (s itSeries) Iterator() SeriesIterator { return s.si } +func (s itSeries) Labels() labels.Labels { return labels.Labels{} } + +func chunkFromSamples(s []sample) *ChunkMeta { + mint, maxt := int64(0), int64(0) + + if len(s) > 0 { + mint, maxt = s[0].t, s[len(s)-1].t + } + + c := chunks.NewXORChunk() + ca, _ := c.Appender() + + for _, s := range s { + ca.Append(s.t, s.v) + } + return &ChunkMeta{ + MinTime: mint, + MaxTime: maxt, + + Chunk: c, + } +} + +func TestSeriesIterator(t *testing.T) { + itcases := []struct { + a, b, c []sample + exp []sample + + mint, maxt int64 + }{ + { + a: []sample{}, + b: []sample{}, + c: []sample{}, + + exp: []sample{}, + + mint: math.MinInt64, + maxt: math.MaxInt64, + }, + { + a: []sample{ + {1, 2}, {2, 3}, {3, 5}, {6, 1}, + }, + b: []sample{}, + c: []sample{ + {7, 89}, {9, 8}, + }, + + exp: []sample{ + {1, 2}, {2, 3}, {3, 5}, {6, 1}, {7, 89}, {9, 8}, + }, + mint: math.MinInt64, + maxt: math.MaxInt64, + }, + { + a: []sample{}, + b: []sample{ + {1, 2}, {2, 3}, {3, 5}, {6, 1}, + }, + c: []sample{ + {7, 89}, {9, 8}, + }, + + exp: []sample{ + {1, 2}, {2, 3}, {3, 5}, {6, 1}, {7, 89}, {9, 8}, + }, + mint: 2, + maxt: 8, + }, + { + a: []sample{ + {1, 2}, {2, 3}, {3, 5}, {6, 1}, + }, + b: []sample{ + {7, 89}, {9, 8}, + }, + c: []sample{ + {10, 22}, {203, 3493}, + }, + + exp: []sample{ + {1, 2}, {2, 3}, {3, 5}, {6, 1}, {7, 89}, {9, 8}, {10, 22}, {203, 3493}, + }, + mint: 6, + maxt: 10, + }, + } + + seekcases := []struct { + a, b, c []sample + + seek int64 + success bool + exp []sample + + mint, maxt int64 + }{ + { + a: []sample{}, + b: []sample{}, + c: []sample{}, + + seek: 0, + success: false, + exp: nil, + }, + { + a: []sample{ + {2, 3}, + }, + b: []sample{}, + c: []sample{ + {7, 89}, {9, 8}, + }, + + seek: 10, + success: false, + exp: nil, + mint: math.MinInt64, + maxt: math.MaxInt64, + }, + { + a: []sample{}, + b: []sample{ + {1, 2}, {3, 5}, {6, 1}, + }, + c: []sample{ + {7, 89}, {9, 8}, + }, + + seek: 2, + success: true, + exp: []sample{ + {3, 5}, {6, 1}, {7, 89}, {9, 8}, + }, + mint: 5, + maxt: 8, + }, + { + a: []sample{ + {6, 1}, + }, + b: []sample{ + {9, 8}, + }, + c: []sample{ + {10, 22}, {203, 3493}, + }, + + seek: 10, + success: true, + exp: []sample{ + {10, 22}, {203, 3493}, + }, + mint: 10, + maxt: 203, + }, + { + a: []sample{ + {6, 1}, + }, + b: []sample{ + {9, 8}, + }, + c: []sample{ + {10, 22}, {203, 3493}, + }, + + seek: 203, + success: true, + exp: []sample{ + {203, 3493}, + }, + mint: 7, + maxt: 203, + }, + } + + t.Run("Chunk", func(t *testing.T) { + for _, tc := range itcases { + chkMetas := []*ChunkMeta{ + chunkFromSamples(tc.a), + chunkFromSamples(tc.b), + chunkFromSamples(tc.c), + } + res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt) + + smplValid := make([]sample, 0) + for _, s := range tc.exp { + if s.t >= tc.mint && s.t <= tc.maxt { + smplValid = append(smplValid, s) + } + } + exp := newListSeriesIterator(smplValid) + + smplExp, errExp := expandSeriesIterator(exp) + smplRes, errRes := expandSeriesIterator(res) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + + t.Run("Seek", func(t *testing.T) { + extra := []struct { + a, b, c []sample + + seek int64 + success bool + exp []sample + + mint, maxt int64 + }{ + { + a: []sample{ + {6, 1}, + }, + b: []sample{ + {9, 8}, + }, + c: []sample{ + {10, 22}, {203, 3493}, + }, + + seek: 203, + success: false, + exp: nil, + mint: 2, + maxt: 202, + }, + { + a: []sample{ + {6, 1}, + }, + b: []sample{ + {9, 8}, + }, + c: []sample{ + {10, 22}, {203, 3493}, + }, + + seek: 5, + success: true, + exp: []sample{{10, 22}}, + mint: 10, + maxt: 202, + }, + } + + seekcases2 := append(seekcases, extra...) + + for _, tc := range seekcases2 { + chkMetas := []*ChunkMeta{ + chunkFromSamples(tc.a), + chunkFromSamples(tc.b), + chunkFromSamples(tc.c), + } + res := newChunkSeriesIterator(chkMetas, tc.mint, tc.maxt) + + smplValid := make([]sample, 0) + for _, s := range tc.exp { + if s.t >= tc.mint && s.t <= tc.maxt { + smplValid = append(smplValid, s) + } + } + exp := newListSeriesIterator(smplValid) + + require.Equal(t, tc.success, res.Seek(tc.seek)) + + if tc.success { + // Init the list and then proceed to check. + remaining := exp.Next() + require.True(t, remaining) + + for remaining { + sExp, eExp := exp.At() + sRes, eRes := res.At() + require.Equal(t, eExp, eRes, "samples error") + require.Equal(t, sExp, sRes, "samples") + + remaining = exp.Next() + require.Equal(t, remaining, res.Next()) + } + } + } + }) + }) + + t.Run("Chain", func(t *testing.T) { + for _, tc := range itcases { + a, b, c := itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)} + + res := newChainedSeriesIterator(a, b, c) + exp := newListSeriesIterator(tc.exp) + + smplExp, errExp := expandSeriesIterator(exp) + smplRes, errRes := expandSeriesIterator(res) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + + t.Run("Seek", func(t *testing.T) { + for _, tc := range seekcases { + a, b, c := itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)} + + res := newChainedSeriesIterator(a, b, c) + exp := newListSeriesIterator(tc.exp) + + require.Equal(t, tc.success, res.Seek(tc.seek)) + + if tc.success { + // Init the list and then proceed to check. + remaining := exp.Next() + require.True(t, remaining) + + for remaining { + sExp, eExp := exp.At() + sRes, eRes := res.At() + require.Equal(t, eExp, eRes, "samples error") + require.Equal(t, sExp, sRes, "samples") + + remaining = exp.Next() + require.Equal(t, remaining, res.Next()) + } + } + } + }) + }) + + return +}