From 9b400b4c5884d8cd942d6c57b92f8b8159c7f4c0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 12 Dec 2016 19:12:55 +0100 Subject: [PATCH] Add chunk based series iterator --- querier.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ reader.go | 13 ++++++++--- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/querier.go b/querier.go index d64f6129a8..5cb6310dcb 100644 --- a/querier.go +++ b/querier.go @@ -1,5 +1,7 @@ package tsdb +import "github.com/fabxc/tsdb/chunks" + // Matcher matches a string. type Matcher interface { // Match returns true if the matcher applies to the string value. @@ -52,3 +54,68 @@ type SeriesIterator interface { // Err returns the current error. Err() error } + +// chunkSeriesIterator implements a series iterator on top +// of a list of time-sorted, non-overlapping chunks. +type chunkSeriesIterator struct { + // minTimes []int64 + chunks []chunks.Chunk + + i int + cur chunks.Iterator + err error +} + +func newChunkSeriesIterator(cs []chunks.Chunk) *chunkSeriesIterator { + return &chunkSeriesIterator{ + chunks: cs, + i: 0, + cur: cs[0].Iterator(), + } +} + +func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { + // TODO(fabxc): skip to relevant chunk. + for it.Next() { + if ts, _ := it.Values(); ts >= t { + return true + } + } + return false +} + +func (it *chunkSeriesIterator) Values() (t int64, v float64) { + return it.cur.Values() +} + +func (it *chunkSeriesIterator) Next() bool { + if it.cur.Next() { + return true + } + if err := it.cur.Err(); err != nil { + return false + } + if it.i == len(it.chunks)-1 { + return false + } + + it.i++ + it.cur = it.chunks[it.i].Iterator() + + return it.Next() +} + +func (it *chunkSeriesIterator) Err() error { + return it.cur.Err() +} + +type bufferedSeriesIterator struct { + // TODO(fabxc): time-based look back buffer for time-aggregating + // queries such as rate. It should allow us to re-use an iterator + // within a range query while calculating time-aggregates at any point. + // + // It also allows looking up/seeking at-or-before without modifying + // the simpler interface. + // + // Consider making this the main external interface. +} diff --git a/reader.go b/reader.go index c4c5dd7211..ed7926c353 100644 --- a/reader.go +++ b/reader.go @@ -70,6 +70,8 @@ type StringTuples interface { } type indexReader struct { + series SeriesReader + // The underlying byte slice holding the encoded series data. b []byte @@ -83,11 +85,14 @@ var ( errInvalidFlag = fmt.Errorf("invalid flag") ) -func newIndexReader(b []byte) (*indexReader, error) { +func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { if len(b) < 16 { return nil, errInvalidSize } - r := &indexReader{b: b} + r := &indexReader{ + series: s, + b: b, + } // Verify magic number. if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex { @@ -276,13 +281,15 @@ func (r *indexReader) Series(ref uint32) (Series, error) { s := &series{ labels: labels, offsets: coffsets, + chunk: r.series.Chunk, } return s, nil } type series struct { labels Labels - offsets []ChunkOffset + offsets []ChunkOffset // in-order chunk refs + chunk func(ref uint32) (chunks.Chunk, error) } func (s *series) Labels() (Labels, error) {