Make blockQuerier return data in valid time-range

Fixes #43

Added mint, maxt to chunkSeriesIterator. Adding a field there is
inevitable as something similar is required for ignoring deleted
time-ranges.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-04-14 00:36:14 +05:30
parent b9868c9f0b
commit b60c2068bc
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
2 changed files with 76 additions and 9 deletions

View file

@ -153,6 +153,9 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
mint: q.mint,
maxt: q.maxt,
},
mint: q.mint,
maxt: q.maxt,
}
}
@ -431,12 +434,14 @@ type blockSeriesSet struct {
set chunkSeriesSet
err error
cur Series
mint, maxt int64
}
func (s *blockSeriesSet) Next() bool {
for s.set.Next() {
lset, chunks := s.set.At()
s.cur = &chunkSeries{labels: lset, chunks: chunks}
s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt}
return true
}
if s.set.Err() != nil {
@ -453,6 +458,8 @@ func (s *blockSeriesSet) Err() error { return s.err }
type chunkSeries struct {
labels labels.Labels
chunks []*ChunkMeta // in-order chunk refs
mint, maxt int64
}
func (s *chunkSeries) Labels() labels.Labels {
@ -460,7 +467,7 @@ func (s *chunkSeries) Labels() labels.Labels {
}
func (s *chunkSeries) Iterator() SeriesIterator {
return newChunkSeriesIterator(s.chunks)
return newChunkSeriesIterator(s.chunks, s.mint, s.maxt)
}
// SeriesIterator iterates over the data of a time series.
@ -555,17 +562,30 @@ type chunkSeriesIterator struct {
i int
cur chunks.Iterator
maxt, mint int64
}
func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator {
func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator {
return &chunkSeriesIterator{
chunks: cs,
i: 0,
cur: cs[0].Chunk.Iterator(),
mint: mint,
maxt: maxt,
}
}
func (it *chunkSeriesIterator) inBounds(t int64) bool {
return t >= it.mint && t <= it.maxt
}
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
if t >= it.maxt || t <= it.mint {
return false
}
// Only do binary search forward to stay in line with other iterators
// that can only move forward.
x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t })
@ -598,9 +618,13 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) {
}
func (it *chunkSeriesIterator) Next() bool {
if it.cur.Next() {
return true
for it.cur.Next() {
t, _ := it.cur.At()
if it.inBounds(t) {
return true
}
}
if err := it.cur.Err(); err != nil {
return false
}

View file

@ -1,6 +1,7 @@
package tsdb
import (
"math"
"math/rand"
"sort"
"testing"
@ -253,8 +254,14 @@ func createIdxChkReaders(tc []struct {
}
func TestBlockQuerier(t *testing.T) {
// Build the querier on data first. Then execute queries on it.
newSeries := func(l map[string]string, s []sample) Series {
return &mockSeries{
labels: func() labels.Labels { return labels.FromMap(l) },
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
}
}
// Build the querier on data first. Then execute queries on it.
basedata := [][]struct {
lset map[string]string
chunks [][]sample
@ -283,7 +290,7 @@ func TestBlockQuerier(t *testing.T) {
{1, 1}, {2, 2}, {3, 3},
},
{
{5, 3}, {6, 6}, {7, 5},
{5, 3}, {6, 6},
},
},
},
@ -318,6 +325,42 @@ func TestBlockQuerier(t *testing.T) {
ms: []labels.Matcher{},
exp: newListSeriesSet([]Series{}),
},
{
dataIdx: 0,
mint: 0,
maxt: 0,
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
exp: newListSeriesSet([]Series{}),
},
{
dataIdx: 0,
mint: 1,
maxt: 0,
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
exp: newListSeriesSet([]Series{}),
},
{
dataIdx: 0,
mint: 2,
maxt: 6,
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"a": "a",
},
[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}},
),
newSeries(map[string]string{
"a": "a",
"b": "b",
},
[]sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}},
),
}),
},
}
Outer:
@ -667,7 +710,7 @@ func TestSeriesIterator(t *testing.T) {
chunkFromSamples(tc.b),
chunkFromSamples(tc.c),
}
res := newChunkSeriesIterator(chkMetas)
res := newChunkSeriesIterator(chkMetas, math.MinInt64, math.MaxInt64)
exp := newListSeriesIterator(tc.exp)
smplExp, errExp := expandSeriesIterator(exp)
@ -684,7 +727,7 @@ func TestSeriesIterator(t *testing.T) {
chunkFromSamples(tc.b),
chunkFromSamples(tc.c),
}
res := newChunkSeriesIterator(chkMetas)
res := newChunkSeriesIterator(chkMetas, math.MinInt64, math.MaxInt64)
exp := newListSeriesIterator(tc.exp)
require.Equal(t, tc.success, res.Seek(tc.seek))