diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d057a3bc3..6708b48aa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## Master / unreleased + - [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse. + ## 0.9.1 - [CHANGE] LiveReader metrics are now injected rather than global. @@ -19,6 +21,7 @@ - [ENHANCEMENT] Reduced disk usage for WAL for small setups. - [ENHANCEMENT] Optimize queries using regexp for set lookups. + ## 0.8.0 - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. diff --git a/chunkenc/chunk.go b/chunkenc/chunk.go index dc566606d5..5f9349f05b 100644 --- a/chunkenc/chunk.go +++ b/chunkenc/chunk.go @@ -44,7 +44,10 @@ type Chunk interface { Bytes() []byte Encoding() Encoding Appender() (Appender, error) - Iterator() Iterator + // The iterator passed as argument is for re-use. + // Depending on implementation, the iterator can + // be re-used or a new iterator can be allocated. + Iterator(Iterator) Iterator NumSamples() int } diff --git a/chunkenc/chunk_test.go b/chunkenc/chunk_test.go index 11dc19079d..f6577b429f 100644 --- a/chunkenc/chunk_test.go +++ b/chunkenc/chunk_test.go @@ -77,7 +77,7 @@ func testChunk(c Chunk) error { // fmt.Println("appended", len(c.Bytes()), c.Bytes()) } - it := c.Iterator() + it := c.Iterator(nil) var res []pair for it.Next() { ts, v := it.At() @@ -133,9 +133,10 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) { res := make([]float64, 0, 1024) + var it Iterator for i := 0; i < len(chunks); i++ { c := chunks[i] - it := c.Iterator() + it := c.Iterator(it) for it.Next() { _, v := it.At() diff --git a/chunkenc/xor.go b/chunkenc/xor.go index 1518772b3c..ca20309f68 100644 --- a/chunkenc/xor.go +++ b/chunkenc/xor.go @@ -77,7 +77,7 @@ func (c *XORChunk) NumSamples() int { // Appender implements the Chunk interface. func (c *XORChunk) Appender() (Appender, error) { - it := c.iterator() + it := c.iterator(nil) // To get an appender we must know the state it would have if we had // appended all existing data from scratch. @@ -102,19 +102,25 @@ func (c *XORChunk) Appender() (Appender, error) { return a, nil } -func (c *XORChunk) iterator() *xorIterator { +func (c *XORChunk) iterator(it Iterator) *xorIterator { // Should iterators guarantee to act on a copy of the data so it doesn't lock append? // When using striped locks to guard access to chunks, probably yes. // Could only copy data if the chunk is not completed yet. + if xorIter, ok := it.(*xorIterator); ok { + xorIter.Reset(c.b.bytes()) + return xorIter + } return &xorIterator{ + // The first 2 bytes contain chunk headers. + // We skip that for actual samples. br: newBReader(c.b.bytes()[2:]), numTotal: binary.BigEndian.Uint16(c.b.bytes()), } } // Iterator implements the Chunk interface. -func (c *XORChunk) Iterator() Iterator { - return c.iterator() +func (c *XORChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) } type xorAppender struct { @@ -243,6 +249,21 @@ func (it *xorIterator) Err() error { return it.err } +func (it *xorIterator) Reset(b []byte) { + // The first 2 bytes contain chunk headers. + // We skip that for actual samples. + it.br = newBReader(b[2:]) + it.numTotal = binary.BigEndian.Uint16(b) + + it.numRead = 0 + it.t = 0 + it.val = 0 + it.leading = 0 + it.trailing = 0 + it.tDelta = 0 + it.err = nil +} + func (it *xorIterator) Next() bool { if it.err != nil || it.numRead == it.numTotal { return false diff --git a/chunks/chunks.go b/chunks/chunks.go index 9ce8c57dae..7d1c57baff 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -245,8 +245,8 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) { if err != nil { return nil, err } - ait := a.Iterator() - bit := b.Iterator() + ait := a.Iterator(nil) + bit := b.Iterator(nil) aok, bok := ait.Next(), bit.Next() for aok && bok { at, av := ait.At() diff --git a/compact.go b/compact.go index e19b7ed769..6df0256784 100644 --- a/compact.go +++ b/compact.go @@ -736,6 +736,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "add symbols") } + delIter := &deletedIterator{} for set.Next() { select { case <-c.ctx.Done(): @@ -788,17 +789,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return err } - it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + delIter.it = chk.Chunk.Iterator(delIter.it) + delIter.intervals = dranges var ( t int64 v float64 ) - for it.Next() { - t, v = it.At() + for delIter.Next() { + t, v = delIter.At() app.Append(t, v) } - if err := it.Err(); err != nil { + if err := delIter.Err(); err != nil { return errors.Wrap(err, "iterate chunk while re-encoding") } diff --git a/head.go b/head.go index 5e2eae8581..8f513b49df 100644 --- a/head.go +++ b/head.go @@ -1185,9 +1185,9 @@ type safeChunk struct { cid int } -func (c *safeChunk) Iterator() chunkenc.Iterator { +func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid) + it := c.s.iterator(c.cid, reuseIter) c.s.Unlock() return it } @@ -1739,7 +1739,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int) chunkenc.Iterator { +func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator { c := s.chunk(id) // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, // which got then garbage collected before it got accessed. @@ -1749,17 +1749,23 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator { } if id-s.firstChunkID < len(s.chunks)-1 { - return c.chunk.Iterator() + return c.chunk.Iterator(it) } // Serve the last 4 samples for the last chunk from the sample buffer // as their compressed bytes may be mutated by added samples. - it := &memSafeIterator{ - Iterator: c.chunk.Iterator(), + if msIter, ok := it.(*memSafeIterator); ok { + msIter.Iterator = c.chunk.Iterator(msIter.Iterator) + msIter.i = -1 + msIter.total = c.chunk.NumSamples() + msIter.buf = s.sampleBuf + return msIter + } + return &memSafeIterator{ + Iterator: c.chunk.Iterator(it), i: -1, total: c.chunk.NumSamples(), buf: s.sampleBuf, } - return it } func (s *memSeries) head() *memChunk { diff --git a/head_test.go b/head_test.go index 55067eae37..040ae8289d 100644 --- a/head_test.go +++ b/head_test.go @@ -159,9 +159,9 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, c.Err()) return x } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) - testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil))) + testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil))) }) } } @@ -313,11 +313,11 @@ func TestMemSeries_truncateChunks(t *testing.T) { // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.chunkID(len(s.chunks) - 1)) + it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil) _, ok := it1.(*memSafeIterator) testutil.Assert(t, ok == true, "") - it2 := s.iterator(s.chunkID(len(s.chunks) - 2)) + it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil) _, ok = it2.(*memSafeIterator) testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") } @@ -451,10 +451,11 @@ func TestHeadDeleteSimple(t *testing.T) { chunkr, err := h.Chunks() testutil.Ok(t, err) + var ii chunkenc.Iterator for _, meta := range chkMetas { chk, err := chunkr.Chunk(meta.Ref) testutil.Ok(t, err) - ii := chk.Iterator() + ii = chk.Iterator(ii) for ii.Next() { t, v := ii.At() actSamples = append(actSamples, sample{t: t, v: v}) diff --git a/mocks_test.go b/mocks_test.go index 243d5cf148..aa84ff0ca6 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -40,10 +41,11 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk i = len(m.series) - 1 } + var iter chunkenc.Iterator for _, chk := range chunks { samples := make([]sample, 0, chk.Chunk.NumSamples()) - iter := chk.Chunk.Iterator() + iter = chk.Chunk.Iterator(iter) for iter.Next() { s := sample{} s.t, s.v = iter.At() diff --git a/querier.go b/querier.go index 253102b0ed..fbd9493f4f 100644 --- a/querier.go +++ b/querier.go @@ -1060,8 +1060,9 @@ func (it *verticalMergeSeriesIterator) Err() error { type chunkSeriesIterator struct { chunks []chunks.Meta - i int - cur chunkenc.Iterator + i int + cur chunkenc.Iterator + bufDelIter *deletedIterator maxt, mint int64 @@ -1069,21 +1070,32 @@ type chunkSeriesIterator struct { } func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { - it := cs[0].Chunk.Iterator() - - if len(dranges) > 0 { - it = &deletedIterator{it: it, intervals: dranges} - } - return &chunkSeriesIterator{ + csi := &chunkSeriesIterator{ chunks: cs, i: 0, - cur: it, mint: mint, maxt: maxt, intervals: dranges, } + csi.resetCurIterator() + + return csi +} + +func (it *chunkSeriesIterator) resetCurIterator() { + if len(it.intervals) == 0 { + it.cur = it.chunks[it.i].Chunk.Iterator(it.cur) + return + } + if it.bufDelIter == nil { + it.bufDelIter = &deletedIterator{ + intervals: it.intervals, + } + } + it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it) + it.cur = it.bufDelIter } func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { @@ -1102,10 +1114,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { } } - it.cur = it.chunks[it.i].Chunk.Iterator() - if len(it.intervals) > 0 { - it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} - } + it.resetCurIterator() for it.cur.Next() { t0, _ := it.cur.At() @@ -1145,10 +1154,7 @@ func (it *chunkSeriesIterator) Next() bool { } it.i++ - it.cur = it.chunks[it.i].Chunk.Iterator() - if len(it.intervals) > 0 { - it.cur = &deletedIterator{it: it.cur, intervals: it.intervals} - } + it.resetCurIterator() return it.Next() } diff --git a/querier_test.go b/querier_test.go index 252170d179..2be48fcd5b 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1270,7 +1270,7 @@ func TestDeletedIterator(t *testing.T) { for _, c := range cases { i := int64(-1) - it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]} + it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]} ranges := c.r[:] for it.Next() { i++