Add chunk based series iterator

This commit is contained in:
Fabian Reinartz 2016-12-12 19:12:55 +01:00
parent b334c3ade8
commit 9b400b4c58
2 changed files with 77 additions and 3 deletions

View file

@ -1,5 +1,7 @@
package tsdb
import "github.com/fabxc/tsdb/chunks"
// Matcher matches a string.
type Matcher interface {
// Match returns true if the matcher applies to the string value.
@ -52,3 +54,68 @@ type SeriesIterator interface {
// Err returns the current error.
Err() error
}
// chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct {
// minTimes []int64
chunks []chunks.Chunk
i int
cur chunks.Iterator
err error
}
func newChunkSeriesIterator(cs []chunks.Chunk) *chunkSeriesIterator {
return &chunkSeriesIterator{
chunks: cs,
i: 0,
cur: cs[0].Iterator(),
}
}
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
// TODO(fabxc): skip to relevant chunk.
for it.Next() {
if ts, _ := it.Values(); ts >= t {
return true
}
}
return false
}
func (it *chunkSeriesIterator) Values() (t int64, v float64) {
return it.cur.Values()
}
func (it *chunkSeriesIterator) Next() bool {
if it.cur.Next() {
return true
}
if err := it.cur.Err(); err != nil {
return false
}
if it.i == len(it.chunks)-1 {
return false
}
it.i++
it.cur = it.chunks[it.i].Iterator()
return it.Next()
}
func (it *chunkSeriesIterator) Err() error {
return it.cur.Err()
}
type bufferedSeriesIterator struct {
// TODO(fabxc): time-based look back buffer for time-aggregating
// queries such as rate. It should allow us to re-use an iterator
// within a range query while calculating time-aggregates at any point.
//
// It also allows looking up/seeking at-or-before without modifying
// the simpler interface.
//
// Consider making this the main external interface.
}

View file

@ -70,6 +70,8 @@ type StringTuples interface {
}
type indexReader struct {
series SeriesReader
// The underlying byte slice holding the encoded series data.
b []byte
@ -83,11 +85,14 @@ var (
errInvalidFlag = fmt.Errorf("invalid flag")
)
func newIndexReader(b []byte) (*indexReader, error) {
func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
if len(b) < 16 {
return nil, errInvalidSize
}
r := &indexReader{b: b}
r := &indexReader{
series: s,
b: b,
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex {
@ -276,13 +281,15 @@ func (r *indexReader) Series(ref uint32) (Series, error) {
s := &series{
labels: labels,
offsets: coffsets,
chunk: r.series.Chunk,
}
return s, nil
}
type series struct {
labels Labels
offsets []ChunkOffset
offsets []ChunkOffset // in-order chunk refs
chunk func(ref uint32) (chunks.Chunk, error)
}
func (s *series) Labels() (Labels, error) {