tsdb: optimise block series iterators

Re-use previous memory if it is already of the correct type.

Also turn two levels of function closure into a single object that
holds the required data.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2022-09-20 19:27:44 +01:00
parent 3c7de69059
commit f0866c0774
2 changed files with 94 additions and 56 deletions

View file

@ -426,6 +426,16 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin
return r.LabelNamesFor(postings...)
}
// These are the things fetched when we move from one series to another.
type seriesData struct {
chks []chunks.Meta
intervals tombstones.Intervals
labels labels.Labels
}
// Labels implements part of storage.Series and storage.ChunkSeries.
func (s *seriesData) Labels() labels.Labels { return s.labels }
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
@ -438,8 +448,7 @@ type blockBaseSeriesSet struct {
mint, maxt int64
disableTrimming bool
currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels
curr seriesData
bufChks []chunks.Meta
bufLbls labels.Labels
@ -519,12 +528,11 @@ func (b *blockBaseSeriesSet) Next() bool {
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
}
b.currLabels = make(labels.Labels, len(b.bufLbls))
copy(b.currLabels, b.bufLbls)
b.curr.labels = make(labels.Labels, len(b.bufLbls))
copy(b.curr.labels, b.bufLbls)
b.curr.chks = chks
b.curr.intervals = intervals
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.blockID, b.chunks, chks, intervals)
}
return true
}
return false
@ -556,29 +564,26 @@ type populateWithDelGenericSeriesIterator struct {
// the same, single series.
chks []chunks.Meta
i int
i int // Index into chks; -1 if not started yet.
err error
bufIter *DeletedIterator
bufIter DeletedIterator // Retained for memory re-use. currDelIter may point here.
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
}
func newPopulateWithDelGenericSeriesIterator(
blockID ulid.ULID,
chunks ChunkReader,
chks []chunks.Meta,
intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{
blockID: blockID,
chunks: chunks,
chks: chks,
i: -1,
bufIter: &DeletedIterator{},
intervals: intervals,
}
func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.blockID = blockID
p.chunks = cr
p.chks = chks
p.i = -1
p.err = nil
p.bufIter.Iter = nil
p.bufIter.Intervals = p.bufIter.Intervals[:0]
p.intervals = intervals
p.currDelIter = nil
p.currChkMeta = chunks.Meta{}
}
func (p *populateWithDelGenericSeriesIterator) next() bool {
@ -618,28 +623,55 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
// We don't want the full chunk, or it's potentially still opened, take
// just a part of it.
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter)
p.currDelIter = &p.bufIter
return true
}
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
type blockSeriesEntry struct {
chunks ChunkReader
blockID ulid.ULID
seriesData
}
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
func (s *blockSeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
pi, ok := it.(*populateWithDelSeriesIterator)
if !ok {
pi = &populateWithDelSeriesIterator{}
}
pi.reset(s.blockID, s.chunks, s.chks, s.intervals)
return pi
}
type chunkSeriesEntry struct {
chunks ChunkReader
blockID ulid.ULID
seriesData
}
func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator {
pi, ok := it.(*populateWithDelChunkSeriesIterator)
if !ok {
pi = &populateWithDelChunkSeriesIterator{}
}
pi.reset(s.blockID, s.chunks, s.chks, s.intervals)
return pi
}
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
*populateWithDelGenericSeriesIterator
populateWithDelGenericSeriesIterator
curr chunkenc.Iterator
}
func (p *populateWithDelSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals)
p.curr = nil
}
func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType {
if p.curr != nil {
if valueType := p.curr.Next(); valueType != chunkenc.ValNone {
@ -701,11 +733,16 @@ func (p *populateWithDelSeriesIterator) Err() error {
}
type populateWithDelChunkSeriesIterator struct {
*populateWithDelGenericSeriesIterator
populateWithDelGenericSeriesIterator
curr chunks.Meta
}
func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals)
p.curr = chunks.Meta{}
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
return false
@ -834,13 +871,11 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde
}
func (b *blockSeriesSet) At() storage.Series {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.SeriesEntry{
Lset: b.currLabels,
SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator {
return currIterFn().toSeriesIterator()
},
// At can be looped over before iterating, so save the current values locally.
return &blockSeriesEntry{
chunks: b.chunks,
blockID: b.blockID,
seriesData: b.curr,
}
}
@ -868,13 +903,11 @@ func newBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombst
}
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.ChunkSeriesEntry{
Lset: b.currLabels,
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
return currIterFn().toChunkSeriesIterator()
},
// At can be looped over before iterating, so save the current values locally.
return &chunkSeriesEntry{
chunks: b.chunks,
blockID: b.blockID,
seriesData: b.curr,
}
}

View file

@ -859,7 +859,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Run("sample", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tc.intervals)
var r []tsdbutil.Sample
if tc.seek != 0 {
@ -879,7 +880,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
})
t.Run("chunk", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toChunkSeriesIterator()
it := &populateWithDelChunkSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tc.intervals)
if tc.seek != 0 {
// Chunk iterator does not have Seek method.
@ -911,7 +913,8 @@ func TestPopulateWithDelSeriesIterator_DoubleSeek(t *testing.T) {
[]tsdbutil.Sample{sample{4, 4, nil, nil}, sample{5, 5, nil, nil}},
)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, nil)
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
require.Equal(t, chunkenc.ValFloat, it.Seek(2))
require.Equal(t, chunkenc.ValFloat, it.Seek(2))
@ -929,7 +932,8 @@ func TestPopulateWithDelSeriesIterator_SeekInCurrentChunk(t *testing.T) {
[]tsdbutil.Sample{},
)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, nil)
require.Equal(t, chunkenc.ValFloat, it.Next())
ts, v := it.At()
require.Equal(t, int64(1), ts)
@ -946,7 +950,8 @@ func TestPopulateWithDelSeriesIterator_SeekWithMinTime(t *testing.T) {
[]tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{6, 8, nil, nil}},
)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, nil)
require.Equal(t, chunkenc.ValNone, it.Seek(7))
require.Equal(t, chunkenc.ValFloat, it.Seek(3))
}
@ -958,9 +963,8 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) {
[]tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{7, 8, nil, nil}},
)
it := newPopulateWithDelGenericSeriesIterator(
ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}),
).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}))
require.Equal(t, chunkenc.ValNone, it.Next())
}
@ -2225,11 +2229,12 @@ func TestBlockBaseSeriesSet(t *testing.T) {
i := 0
for bcs.Next() {
chks := bcs.currIterFn().chks
si := populateWithDelGenericSeriesIterator{}
si.reset(bcs.blockID, bcs.chunks, bcs.curr.chks, bcs.curr.intervals)
idx := tc.expIdxs[i]
require.Equal(t, tc.series[idx].lset, bcs.currLabels)
require.Equal(t, tc.series[idx].chunks, chks)
require.Equal(t, tc.series[idx].lset, bcs.curr.labels)
require.Equal(t, tc.series[idx].chunks, si.chks)
i++
}