diff --git a/db.go b/db.go index e9abe73e7..42f01b84a 100644 --- a/db.go +++ b/db.go @@ -293,6 +293,10 @@ func intervalOverlap(amin, amax, bmin, bmax int64) bool { return false } +func intervalContains(min, max, t int64) bool { + return t >= min && t <= max +} + // blocksForRange returns all blocks within the shard that may contain // data for the given time range. func (s *Shard) blocksForInterval(mint, maxt int64) []block { @@ -312,6 +316,8 @@ func (s *Shard) blocksForInterval(mint, maxt int64) []block { bs = append(bs, s.head) } + fmt.Println("blocks for interval", bs) + return bs } @@ -393,6 +399,7 @@ type chunkDesc struct { chunk chunks.Chunk // Caching fields. + firsTimestamp int64 lastTimestamp int64 lastValue float64 @@ -405,6 +412,7 @@ func (cd *chunkDesc) append(ts int64, v float64) (err error) { if err != nil { return err } + cd.firsTimestamp = ts } if err := cd.app.Append(ts, v); err != nil { return err diff --git a/head.go b/head.go index c27d72958..a358a8e1c 100644 --- a/head.go +++ b/head.go @@ -76,15 +76,18 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) { } // Series returns the series for the given reference. -func (h *HeadBlock) Series(ref uint32) (Series, error) { +func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { cd, ok := h.index.forward[ref] if !ok { return nil, errNotFound } + if !intervalOverlap(cd.firsTimestamp, cd.lastTimestamp, mint, maxt) { + return nil, nil + } s := &series{ labels: cd.lset, - offsets: []ChunkOffset{ - {Value: h.stats.MinTime, Offset: 0}, + chunks: []ChunkMeta{ + {MinTime: h.stats.MinTime, Ref: 0}, }, chunk: func(ref uint32) (chunks.Chunk, error) { return cd.chunk, nil diff --git a/querier.go b/querier.go index 3a970d740..33141c32a 100644 --- a/querier.go +++ b/querier.go @@ -47,16 +47,13 @@ type Querier interface { type Series interface { // Labels returns the complete set of labels identifying the series. Labels() Labels + // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator // Ref() uint32 } -func inRange(x, mint, maxt int64) bool { - return x >= mint && x <= maxt -} - // querier merges query results from a set of shard querieres. type querier struct { mint, maxt int64 @@ -164,6 +161,8 @@ func (q *blockQuerier) Select(ms ...Matcher) SeriesSet { return &blockSeriesSet{ index: q.index, it: Intersect(its...), + mint: q.mint, + maxt: q.maxt, } } @@ -379,34 +378,67 @@ func (s *shardSeriesSet) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - index IndexReader - it Postings + index IndexReader + it Postings + mint, maxt int64 err error cur Series } func (s *blockSeriesSet) Next() bool { - // Get next reference from postings iterator. - if !s.it.Next() { + // Step through the postings iterator to find potential series. + // Resolving series may return nil if no applicable data for the + // time range exists and we can skip to the next series. + for s.it.Next() { + series, err := s.index.Series(s.it.Value(), s.mint, s.maxt) + if err != nil { + s.err = err + return false + } + if series != nil { + s.cur = series + return true + } + } + if s.it.Err() != nil { s.err = s.it.Err() - return false } - - // Resolve reference to series. - series, err := s.index.Series(s.it.Value()) - if err != nil { - s.err = err - return false - } - - s.cur = series - return true + return false } func (s *blockSeriesSet) Series() Series { return s.cur } func (s *blockSeriesSet) Err() error { return s.err } +type series struct { + labels Labels + chunks []ChunkMeta // in-order chunk refs + + chunk func(ref uint32) (chunks.Chunk, error) +} + +func (s *series) Labels() Labels { + return s.labels +} + +func (s *series) Iterator() SeriesIterator { + var cs []chunks.Chunk + var mints []int64 + + for _, co := range s.chunks { + c, err := s.chunk(co.Ref) + if err != nil { + panic(err) // TODO(fabxc): add error series iterator. + } + cs = append(cs, c) + mints = append(mints, co.MinTime) + } + + // TODO(fabxc): consider pushing chunk retrieval further down. In practice, we + // probably have to touch all chunks anyway and it doesn't matter. + return newChunkSeriesIterator(mints, cs) +} + // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. @@ -421,6 +453,7 @@ type SeriesIterator interface { Err() error } +// chainedSeries implements a series for a list of time-sorted series. type chainedSeries struct { series []Series } @@ -430,46 +463,29 @@ func (s *chainedSeries) Labels() Labels { } func (s *chainedSeries) Iterator() SeriesIterator { - it := &chainedSeriesIterator{ - series: make([]SeriesIterator, 0, len(s.series)), - } - for _, series := range s.series { - it.series = append(it.series, series.Iterator()) - } - return it + return &chainedSeriesIterator{series: s.series} } // chainedSeriesIterator implements a series iterater over a list // of time-sorted, non-overlapping iterators. type chainedSeriesIterator struct { - mints []int64 // minimum timestamps for each iterator - series []SeriesIterator // iterators in time order + series []Series // series in time order i int cur SeriesIterator } func (it *chainedSeriesIterator) Seek(t int64) bool { - x := sort.Search(len(it.mints), func(i int) bool { return it.mints[i] >= t }) - - if x == len(it.mints) { - return false - } - if it.mints[x] == t { - if x == 0 { - return false - } - x-- - } - - it.i = x - it.cur = it.series[x] - - for it.cur.Next() { - t0, _ := it.cur.Values() - if t0 >= t { - break + // We just scan the chained series sequentially as they are already + // pre-selected by relevant time and should be accessed sequentially anyway. + for i, s := range it.series[it.i:] { + cur := s.Iterator() + if !cur.Seek(t) { + continue } + it.cur = cur + it.i += i + return true } return false } @@ -486,7 +502,7 @@ func (it *chainedSeriesIterator) Next() bool { } it.i++ - it.cur = it.series[it.i] + it.cur = it.series[it.i].Iterator() return it.Next() } @@ -509,8 +525,12 @@ type chunkSeriesIterator struct { cur chunks.Iterator } -func newChunkSeriesIterator(cs []chunks.Chunk) *chunkSeriesIterator { +func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator { + if len(mints) != len(cs) { + panic("chunk references and chunks length don't match") + } return &chunkSeriesIterator{ + mints: mints, chunks: cs, i: 0, cur: cs[0].Iterator(), @@ -536,7 +556,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { for it.cur.Next() { t0, _ := it.cur.Values() if t0 >= t { - break + return true } } return false diff --git a/reader.go b/reader.go index 73e88de33..ff5beb8a1 100644 --- a/reader.go +++ b/reader.go @@ -58,7 +58,7 @@ type IndexReader interface { Postings(name, value string) (Postings, error) // Series returns the series for the given reference. - Series(ref uint32) (Series, error) + Series(ref uint32, mint, maxt int64) (Series, error) } // StringTuples provides access to a sorted list of string tuples. @@ -231,7 +231,7 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { return st, nil } -func (r *indexReader) Series(ref uint32) (Series, error) { +func (r *indexReader) Series(ref uint32, mint, maxt int64) (Series, error) { k, n := binary.Uvarint(r.b[ref:]) if n < 1 { return nil, errInvalidSize @@ -249,13 +249,15 @@ func (r *indexReader) Series(ref uint32) (Series, error) { b = b[n:] } - // Offests must occur in pairs representing name and value. + // Symbol offests must occur in pairs representing name and value. if len(offsets)&1 != 0 { return nil, errInvalidSize } - // TODO(fabxc): Fully materialize series for now. Figure out later if it + // TODO(fabxc): Fully materialize series symbols for now. Figure out later if it // makes sense to decode those lazily. + // If we use unsafe strings the there'll be no copy overhead. + // // The references are expected to be sorted and match the order of // the underlying strings. labels := make(Labels, 0, k) @@ -275,17 +277,28 @@ func (r *indexReader) Series(ref uint32) (Series, error) { }) } - // Read the chunk offsets. - k, n = binary.Uvarint(r.b[ref:]) + // Read the chunks meta data. + k, n = binary.Uvarint(b) if n < 1 { return nil, errInvalidSize } b = b[n:] - coffsets := make([]ChunkOffset, 0, k) + chunks := make([]ChunkMeta, 0, k) for i := 0; i < int(k); i++ { - v, n := binary.Varint(b) + firstTime, n := binary.Varint(b) + if n < 1 { + return nil, errInvalidSize + } + b = b[n:] + + // Terminate early if we exceeded the queried time range. + if firstTime > maxt { + break + } + + lastTime, n := binary.Varint(b) if n < 1 { return nil, errInvalidSize } @@ -297,18 +310,28 @@ func (r *indexReader) Series(ref uint32) (Series, error) { } b = b[n:] - coffsets = append(coffsets, ChunkOffset{ - Offset: uint32(o), - Value: v, + // Skip the chunk if it is before the queried time range. + if lastTime < mint { + continue + } + + chunks = append(chunks, ChunkMeta{ + Ref: uint32(o), + MinTime: firstTime, + MaxTime: lastTime, }) } - - s := &series{ - labels: labels, - offsets: coffsets, - chunk: r.series.Chunk, + // If no chunks applicable to the time range were found, the series + // can be skipped. + if len(chunks) == 0 { + return nil, nil } - return s, nil + + return &series{ + labels: labels, + chunks: chunks, + chunk: r.series.Chunk, + }, nil } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -344,30 +367,6 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return &listIterator{list: l, idx: -1}, nil } -type series struct { - labels Labels - offsets []ChunkOffset // in-order chunk refs - chunk func(ref uint32) (chunks.Chunk, error) -} - -func (s *series) Labels() Labels { - return s.labels -} - -func (s *series) Iterator() SeriesIterator { - var cs []chunks.Chunk - - for _, co := range s.offsets { - c, err := s.chunk(co.Offset) - if err != nil { - panic(err) // TODO(fabxc): add error series iterator. - } - cs = append(cs, c) - } - - return newChunkSeriesIterator(cs) -} - type stringTuples struct { l int // tuple length s []string // flattened tuple entries diff --git a/test/labels_test.go b/test/labels_test.go index a802009b7..7349a75c0 100644 --- a/test/labels_test.go +++ b/test/labels_test.go @@ -2,6 +2,7 @@ package test import ( "bytes" + "crypto/rand" "testing" "github.com/fabxc/tsdb" @@ -55,40 +56,64 @@ func BenchmarkLabelSetAccess(b *testing.B) { } func BenchmarkStringBytesEquals(b *testing.B) { + randBytes := func(n int) ([]byte, []byte) { + buf1 := make([]byte, n) + if _, err := rand.Read(buf1); err != nil { + b.Fatal(err) + } + buf2 := make([]byte, n) + copy(buf1, buf2) + + return buf1, buf2 + } + cases := []struct { name string - a, b string + f func() ([]byte, []byte) }{ { name: "equal", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", + f: func() ([]byte, []byte) { + return randBytes(60) + }, }, { name: "1-flip-end", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,353", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + b2[59] ^= b2[59] + return b1, b2 + }, }, { name: "1-flip-middle", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj504cxf594802h875hgzz0h3586x8xz,359", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + b2[29] ^= b2[29] + return b1, b2 + }, }, { name: "1-flip-start", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "adfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + b2[0] ^= b2[0] + return b1, b2 + }, }, { name: "different-length", - a: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,359", - b: "sdfn492cn9xwm0ws8r,4932x98f,uj594cxf594802h875hgzz0h3586x8xz,35", + f: func() ([]byte, []byte) { + b1, b2 := randBytes(60) + return b1, b2[:59] + }, }, } for _, c := range cases { b.Run(c.name+"-strings", func(b *testing.B) { - as, bs := c.a, c.b + ab, bb := c.f() + as, bs := string(ab), string(bb) b.SetBytes(int64(len(as))) var r bool @@ -100,7 +125,7 @@ func BenchmarkStringBytesEquals(b *testing.B) { }) b.Run(c.name+"-bytes", func(b *testing.B) { - ab, bb := []byte(c.a), []byte(c.b) + ab, bb := c.f() b.SetBytes(int64(len(ab))) var r bool @@ -112,7 +137,7 @@ func BenchmarkStringBytesEquals(b *testing.B) { }) b.Run(c.name+"-bytes-length-check", func(b *testing.B) { - ab, bb := []byte(c.a), []byte(c.b) + ab, bb := c.f() b.SetBytes(int64(len(ab))) var r bool diff --git a/writer.go b/writer.go index 83f8449ee..72dbdf8a3 100644 --- a/writer.go +++ b/writer.go @@ -92,13 +92,13 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e return err } - offsets := make([]ChunkOffset, 0, len(chks)) - lastTimestamp := w.baseTimestamp + metas := make([]ChunkMeta, 0, len(chks)) for _, cd := range chks { - offsets = append(offsets, ChunkOffset{ - Value: lastTimestamp, - Offset: uint32(w.n), + metas = append(metas, ChunkMeta{ + MinTime: cd.firsTimestamp, + MaxTime: cd.lastTimestamp, + Ref: uint32(w.n), }) n = binary.PutUvarint(b[:], uint64(len(cd.chunk.Bytes()))) @@ -111,7 +111,6 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e if err := w.write(wr, cd.chunk.Bytes()); err != nil { return err } - lastTimestamp = cd.lastTimestamp } if err := w.write(w.w, h.Sum(nil)); err != nil { @@ -119,7 +118,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e } if w.index != nil { - w.index.AddSeries(ref, lset, offsets...) + w.index.AddSeries(ref, lset, metas...) } return nil } @@ -141,9 +140,10 @@ func (w *seriesWriter) Close() error { return nil } -type ChunkOffset struct { - Value int64 - Offset uint32 +type ChunkMeta struct { + Ref uint32 + MinTime int64 + MaxTime int64 } type BlockStats struct { @@ -161,7 +161,7 @@ type IndexWriter interface { // of chunks that the index can reference. // The reference number is used to resolve a series against the postings // list iterator. It only has to be available during the write processing. - AddSeries(ref uint32, l Labels, o ...ChunkOffset) + AddSeries(ref uint32, l Labels, chunks ...ChunkMeta) // WriteStats writes final stats for the indexed block. WriteStats(BlockStats) error @@ -183,8 +183,8 @@ type IndexWriter interface { type indexWriterSeries struct { labels Labels - chunks []ChunkOffset // series file offset of chunks - offset uint32 // index file offset of series reference + chunks []ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference } // indexWriter implements the IndexWriter interface for the standard @@ -242,7 +242,7 @@ func (w *indexWriter) writeMeta() error { return w.write(w.w, b[:]) } -func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) { +func (w *indexWriter) AddSeries(ref uint32, lset Labels, chunks ...ChunkMeta) { // Populate the symbol table from all label sets we have to reference. for _, l := range lset { w.symbols[l.Name] = 0 @@ -251,7 +251,7 @@ func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) w.series[ref] = &indexWriterSeries{ labels: lset, - chunks: offsets, + chunks: chunks, } } @@ -332,15 +332,17 @@ func (w *indexWriter) writeSeries() error { b = append(b, buf[:n]...) } - // Write skiplist to chunk offsets. + // Write chunks meta data including reference into chunk file. n = binary.PutUvarint(buf, uint64(len(s.chunks))) b = append(b, buf[:n]...) for _, c := range s.chunks { - n = binary.PutVarint(buf, c.Value) + n = binary.PutVarint(buf, c.MinTime) + b = append(b, buf[:n]...) + n = binary.PutVarint(buf, c.MaxTime) b = append(b, buf[:n]...) - n = binary.PutUvarint(buf, uint64(c.Offset)) + n = binary.PutUvarint(buf, uint64(c.Ref)) b = append(b, buf[:n]...) } }