From b60c2068bc18b58383c5f6ac1086aa36b054e891 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 14 Apr 2017 00:36:14 +0530 Subject: [PATCH] Make blockQuerier return data in valid time-range Fixes #43 Added mint, maxt to chunkSeriesIterator. Adding a field there is inevitable as something similar is required for ignoring deleted time-ranges. Signed-off-by: Goutham Veeramachaneni --- querier.go | 34 ++++++++++++++++++++++++++++----- querier_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/querier.go b/querier.go index 1759a9212..2e4f179a7 100644 --- a/querier.go +++ b/querier.go @@ -153,6 +153,9 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { mint: q.mint, maxt: q.maxt, }, + + mint: q.mint, + maxt: q.maxt, } } @@ -431,12 +434,14 @@ type blockSeriesSet struct { set chunkSeriesSet err error cur Series + + mint, maxt int64 } func (s *blockSeriesSet) Next() bool { for s.set.Next() { lset, chunks := s.set.At() - s.cur = &chunkSeries{labels: lset, chunks: chunks} + s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt} return true } if s.set.Err() != nil { @@ -453,6 +458,8 @@ func (s *blockSeriesSet) Err() error { return s.err } type chunkSeries struct { labels labels.Labels chunks []*ChunkMeta // in-order chunk refs + + mint, maxt int64 } func (s *chunkSeries) Labels() labels.Labels { @@ -460,7 +467,7 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - return newChunkSeriesIterator(s.chunks) + return newChunkSeriesIterator(s.chunks, s.mint, s.maxt) } // SeriesIterator iterates over the data of a time series. @@ -555,17 +562,30 @@ type chunkSeriesIterator struct { i int cur chunks.Iterator + + maxt, mint int64 } -func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator { return &chunkSeriesIterator{ chunks: cs, i: 0, cur: cs[0].Chunk.Iterator(), + + mint: mint, + maxt: maxt, } } +func (it *chunkSeriesIterator) inBounds(t int64) bool { + return t >= it.mint && t <= it.maxt +} + func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { + if t >= it.maxt || t <= it.mint { + return false + } + // Only do binary search forward to stay in line with other iterators // that can only move forward. x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) @@ -598,9 +618,13 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) { } func (it *chunkSeriesIterator) Next() bool { - if it.cur.Next() { - return true + for it.cur.Next() { + t, _ := it.cur.At() + if it.inBounds(t) { + return true + } } + if err := it.cur.Err(); err != nil { return false } diff --git a/querier_test.go b/querier_test.go index dd8a0a731..e4fcf8afb 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1,6 +1,7 @@ package tsdb import ( + "math" "math/rand" "sort" "testing" @@ -253,8 +254,14 @@ func createIdxChkReaders(tc []struct { } func TestBlockQuerier(t *testing.T) { - // Build the querier on data first. Then execute queries on it. + newSeries := func(l map[string]string, s []sample) Series { + return &mockSeries{ + labels: func() labels.Labels { return labels.FromMap(l) }, + iterator: func() SeriesIterator { return newListSeriesIterator(s) }, + } + } + // Build the querier on data first. Then execute queries on it. basedata := [][]struct { lset map[string]string chunks [][]sample @@ -283,7 +290,7 @@ func TestBlockQuerier(t *testing.T) { {1, 1}, {2, 2}, {3, 3}, }, { - {5, 3}, {6, 6}, {7, 5}, + {5, 3}, {6, 6}, }, }, }, @@ -318,6 +325,42 @@ func TestBlockQuerier(t *testing.T) { ms: []labels.Matcher{}, exp: newListSeriesSet([]Series{}), }, + { + dataIdx: 0, + + mint: 0, + maxt: 0, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{}), + }, + { + dataIdx: 0, + + mint: 1, + maxt: 0, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{}), + }, + { + dataIdx: 0, + + mint: 2, + maxt: 6, + ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")}, + exp: newListSeriesSet([]Series{ + newSeries(map[string]string{ + "a": "a", + }, + []sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}}, + ), + newSeries(map[string]string{ + "a": "a", + "b": "b", + }, + []sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}}, + ), + }), + }, } Outer: @@ -667,7 +710,7 @@ func TestSeriesIterator(t *testing.T) { chunkFromSamples(tc.b), chunkFromSamples(tc.c), } - res := newChunkSeriesIterator(chkMetas) + res := newChunkSeriesIterator(chkMetas, math.MinInt64, math.MaxInt64) exp := newListSeriesIterator(tc.exp) smplExp, errExp := expandSeriesIterator(exp) @@ -684,7 +727,7 @@ func TestSeriesIterator(t *testing.T) { chunkFromSamples(tc.b), chunkFromSamples(tc.c), } - res := newChunkSeriesIterator(chkMetas) + res := newChunkSeriesIterator(chkMetas, math.MinInt64, math.MaxInt64) exp := newListSeriesIterator(tc.exp) require.Equal(t, tc.success, res.Seek(tc.seek))