Merge pull request #11334 from bboreham/tsdb-reuse-iterators

tsdb: reuse iterators to save garbage [INTERFACE CHANGE]
This commit is contained in:
Bryan Boreham 2022-12-15 19:37:34 +00:00 committed by GitHub
commit 3543257d93
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 353 additions and 195 deletions

View file

@ -49,7 +49,7 @@ func queryAllSeries(t testing.TB, q storage.Querier, expectedMinTime, expectedMa
samples := []backfillSample{}
for ss.Next() {
series := ss.At()
it := series.Iterator()
it := series.Iterator(nil)
require.NoError(t, it.Err())
for it.Next() == chunkenc.ValFloat {
ts, v := it.At()

View file

@ -139,7 +139,7 @@ func TestBackfillRuleIntegration(t *testing.T) {
} else {
require.Equal(t, 3, len(series.Labels()))
}
it := series.Iterator()
it := series.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
samplesCount++
ts, v := it.At()

View file

@ -644,7 +644,7 @@ func dumpSamples(path string, mint, maxt int64) (err error) {
for ss.Next() {
series := ss.At()
lbs := series.Labels()
it := series.Iterator()
it := series.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
ts, val := it.At()
fmt.Printf("%s %g %d\n", lbs, val, ts)

View file

@ -1393,10 +1393,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
enh := &EvalNodeHelper{Out: make(Vector, 0, 1)}
// Process all the calls for one time series at a time.
it := storage.NewBuffer(selRange)
var chkIter chunkenc.Iterator
for i, s := range selVS.Series {
ev.currentSamples -= len(points)
points = points[:0]
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
metric := selVS.Series[i].Labels()
// The last_over_time function acts like offset; thus, it
// should keep the metric name. For all the other range
@ -1578,8 +1580,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
}
mat := make(Matrix, 0, len(e.Series))
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range e.Series {
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
ss := Series{
Metric: e.Series[i].Labels(),
Points: getPointSlice(numSteps),
@ -1723,8 +1727,10 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
}
vec := make(Vector, 0, len(node.Series))
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range node.Series {
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
t, v, h, ok := ev.vectorSelectorSingle(it, node, ts)
if ok {
@ -1812,12 +1818,14 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
var chkIter chunkenc.Iterator
series := vs.Series
for i, s := range series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
}
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
ss := Series{
Metric: series[i].Labels(),
}

View file

@ -143,7 +143,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
got := Series{
Metric: storageSeries.Labels(),
}
it := storageSeries.Iterator()
it := storageSeries.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
got.Points = append(got.Points, Point{T: t, V: v})

View file

@ -363,7 +363,11 @@ func (ss *StorageSeries) Labels() labels.Labels {
}
// Iterator returns a new iterator of the data of the series.
func (ss *StorageSeries) Iterator() chunkenc.Iterator {
func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
if ssi, ok := it.(*storageSeriesIterator); ok {
ssi.reset(ss.series)
return ssi
}
return newStorageSeriesIterator(ss.series)
}
@ -379,6 +383,11 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator {
}
}
func (ssi *storageSeriesIterator) reset(series Series) {
ssi.points = series.Points
ssi.curr = -1
}
func (ssi *storageSeriesIterator) Seek(t int64) chunkenc.ValueType {
i := ssi.curr
if i < 0 {

View file

@ -807,7 +807,7 @@ func (g *Group) RestoreForState(ts time.Time) {
// Series found for the 'for' state.
var t int64
var v float64
it := s.Iterator()
it := s.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v = it.At()
}

View file

@ -592,12 +592,13 @@ func TestStaleness(t *testing.T) {
// Convert a SeriesSet into a form usable with require.Equal.
func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) {
result := map[string][]promql.Point{}
var it chunkenc.Iterator
for ss.Next() {
series := ss.At()
points := []promql.Point{}
it := series.Iterator()
it := series.Iterator(it)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
points = append(points, promql.Point{T: t, V: v})

View file

@ -2959,7 +2959,7 @@ func TestScrapeReportSingleAppender(t *testing.T) {
c := 0
for series.Next() {
i := series.At().Iterator()
i := series.At().Iterator(nil)
for i.Next() != chunkenc.ValNone {
c++
}
@ -3032,7 +3032,7 @@ func TestScrapeReportLimit(t *testing.T) {
var found bool
for series.Next() {
i := series.At().Iterator()
i := series.At().Iterator(nil)
for i.Next() == chunkenc.ValFloat {
_, v := i.At()
require.Equal(t, 1.0, v)

View file

@ -86,11 +86,12 @@ func TestFanout_SelectSorted(t *testing.T) {
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator()
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
@ -112,11 +113,12 @@ func TestFanout_SelectSorted(t *testing.T) {
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator()
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value

View file

@ -382,7 +382,7 @@ func (s mockSeries) Labels() labels.Labels {
return labels.FromStrings(s.labelSet...)
}
func (s mockSeries) Iterator() chunkenc.Iterator {
func (s mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
return chunkenc.MockSeriesIterator(s.timestamps, s.values)
}
@ -421,14 +421,17 @@ type Labels interface {
}
type SampleIterable interface {
// Iterator returns a new, independent iterator of the data of the series.
Iterator() chunkenc.Iterator
// Iterator returns an iterator of the data of the series.
// The iterator passed as argument is for re-use, if not nil.
// Depending on implementation, the iterator can
// be re-used or a new iterator can be allocated.
Iterator(chunkenc.Iterator) chunkenc.Iterator
}
type ChunkIterable interface {
// Iterator returns a new, independent iterator that iterates over potentially overlapping
// Iterator returns an iterator that iterates over potentially overlapping
// chunks of the series, sorted by min time.
Iterator() chunks.Iterator
Iterator(chunks.Iterator) chunks.Iterator
}
type Warnings []error

View file

@ -425,12 +425,8 @@ func ChainedSeriesMerge(series ...Series) Series {
}
return &SeriesEntry{
Lset: series[0].Labels(),
SampleIteratorFn: func() chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return NewChainSampleIterator(iterators)
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return ChainSampleIteratorFromSeries(it, series)
},
}
}
@ -446,15 +442,42 @@ type chainSampleIterator struct {
lastT int64
}
// NewChainSampleIterator returns a single iterator that iterates over the samples from the given iterators in a sorted
// fashion. If samples overlap, one sample from overlapped ones is kept (randomly) and all others with the same
// timestamp are dropped.
func NewChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
return &chainSampleIterator{
iterators: iterators,
h: nil,
lastT: math.MinInt64,
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator {
csi, ok := it.(*chainSampleIterator)
if !ok {
csi = &chainSampleIterator{}
}
if cap(csi.iterators) < length {
csi.iterators = make([]chunkenc.Iterator, length)
} else {
csi.iterators = csi.iterators[:length]
}
csi.h = nil
csi.lastT = math.MinInt64
return csi
}
func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(series))
for i, s := range series {
csi.iterators[i] = s.Iterator(csi.iterators[i])
}
return csi
}
func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(chunks))
for i, c := range chunks {
csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i])
}
return csi
}
func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator {
csi := getChainSampleIterator(it, 0)
csi.iterators = iterators
return csi
}
func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
@ -607,10 +630,10 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
iterators = append(iterators, s.Iterator(nil))
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
@ -693,7 +716,7 @@ func (c *compactChunkIterator) Next() bool {
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator()
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator(nil)
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false
@ -751,10 +774,10 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
iterators = append(iterators, s.Iterator(nil))
}
return &concatenatingChunkIterator{
iterators: iterators,

View file

@ -202,8 +202,8 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
expectedSeries := tc.expected.At()
require.Equal(t, expectedSeries.Labels(), actualSeries.Labels())
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(), nil)
actSmpl, actErr := ExpandSamples(actualSeries.Iterator(), nil)
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(nil), nil)
actSmpl, actErr := ExpandSamples(actualSeries.Iterator(nil), nil)
require.Equal(t, expErr, actErr)
require.Equal(t, expSmpl, actSmpl)
}
@ -370,8 +370,8 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
expectedSeries := tc.expected.At()
require.Equal(t, expectedSeries.Labels(), actualSeries.Labels())
expChks, expErr := ExpandChunks(expectedSeries.Iterator())
actChks, actErr := ExpandChunks(actualSeries.Iterator())
expChks, expErr := ExpandChunks(expectedSeries.Iterator(nil))
actChks, actErr := ExpandChunks(actualSeries.Iterator(nil))
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
@ -533,8 +533,8 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
require.Equal(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := ExpandChunks(merged.Iterator())
expChks, expErr := ExpandChunks(tc.expected.Iterator())
actChks, actErr := ExpandChunks(merged.Iterator(nil))
expChks, expErr := ExpandChunks(tc.expected.Iterator(nil))
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
@ -667,8 +667,8 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
require.Equal(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := ExpandChunks(merged.Iterator())
expChks, expErr := ExpandChunks(tc.expected.Iterator())
actChks, actErr := ExpandChunks(merged.Iterator(nil))
expChks, expErr := ExpandChunks(tc.expected.Iterator(nil))
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
@ -809,7 +809,7 @@ func TestChainSampleIterator(t *testing.T) {
expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}},
},
} {
merged := NewChainSampleIterator(tc.input)
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual, err := ExpandSamples(merged, nil)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
@ -855,7 +855,7 @@ func TestChainSampleIteratorSeek(t *testing.T) {
expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}},
},
} {
merged := NewChainSampleIterator(tc.input)
merged := ChainSampleIteratorFromIterators(nil, tc.input)
actual := []tsdbutil.Sample{}
if merged.Seek(tc.seek) == chunkenc.ValFloat {
t, v := merged.At()
@ -868,9 +868,7 @@ func TestChainSampleIteratorSeek(t *testing.T) {
}
}
var result []tsdbutil.Sample
func makeSeriesSet(numSeries, numSamples int) SeriesSet {
func makeSeries(numSeries, numSamples int) []Series {
series := []Series{}
for j := 0; j < numSeries; j++ {
labels := labels.FromStrings("foo", fmt.Sprintf("bar%d", j))
@ -880,30 +878,39 @@ func makeSeriesSet(numSeries, numSamples int) SeriesSet {
}
series = append(series, NewListSeries(labels, samples))
}
return NewMockSeriesSet(series...)
return series
}
func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet {
seriesSets := []genericSeriesSet{}
for i := 0; i < numSeriesSets; i++ {
seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)})
func makeMergeSeriesSet(serieses [][]Series) SeriesSet {
seriesSets := make([]genericSeriesSet, len(serieses))
for i, s := range serieses {
seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)}
}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
}
func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {
func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) {
var err error
var t int64
var v float64
var iter chunkenc.Iterator
for n := 0; n < b.N; n++ {
seriesSet := makeSeriesSet()
for seriesSet.Next() {
result, err = ExpandSamples(seriesSet.At().Iterator(), nil)
require.NoError(b, err)
iter = seriesSet.At().Iterator(iter)
for iter.Next() == chunkenc.ValFloat {
t, v = iter.At()
}
err = iter.Err()
}
require.NoError(b, err)
require.NotEqual(b, t, v) // To ensure the inner loop doesn't get optimised away.
}
}
func BenchmarkNoMergeSeriesSet_100_100(b *testing.B) {
seriesSet := makeSeriesSet(100, 100)
benchmarkDrain(seriesSet, b)
series := makeSeries(100, 100)
benchmarkDrain(b, func() SeriesSet { return NewMockSeriesSet(series...) })
}
func BenchmarkMergeSeriesSet(b *testing.B) {
@ -914,9 +921,12 @@ func BenchmarkMergeSeriesSet(b *testing.B) {
{10, 100, 100},
{100, 100, 100},
} {
seriesSet := makeMergeSeriesSet(bm.numSeriesSets, bm.numSeries, bm.numSamples)
serieses := [][]Series{}
for i := 0; i < bm.numSeriesSets; i++ {
serieses = append(serieses, makeSeries(bm.numSeries, bm.numSamples))
}
b.Run(fmt.Sprintf("%d_%d_%d", bm.numSeriesSets, bm.numSeries, bm.numSamples), func(b *testing.B) {
benchmarkDrain(seriesSet, b)
benchmarkDrain(b, func() SeriesSet { return makeMergeSeriesSet(serieses) })
})
}
}

View file

@ -33,6 +33,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
)
// decodeReadLimit is the maximum size of a read request body in bytes.
@ -115,9 +116,10 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHi
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, storage.Warnings, error) {
numSamples := 0
resp := &prompb.QueryResult{}
var iter chunkenc.Iterator
for ss.Next() {
series := ss.At()
iter := series.Iterator()
iter = series.Iterator(iter)
samples := []prompb.Sample{}
for iter.Next() == chunkenc.ValFloat {
@ -199,11 +201,12 @@ func StreamChunkedReadResponses(
var (
chks []prompb.Chunk
lbls []prompb.Label
iter chunks.Iterator
)
for ss.Next() {
series := ss.At()
iter := series.Iterator()
iter = series.Iterator(iter)
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
frameBytesLeft := maxBytesInFrame
@ -346,7 +349,11 @@ func (c *concreteSeries) Labels() labels.Labels {
return labels.New(c.labels...)
}
func (c *concreteSeries) Iterator() chunkenc.Iterator {
func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
if csi, ok := it.(*concreteSeriesIterator); ok {
csi.reset(c)
return csi
}
return newConcreteSeriersIterator(c)
}
@ -363,6 +370,11 @@ func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator {
}
}
func (c *concreteSeriesIterator) reset(series *concreteSeries) {
c.cur = -1
c.series = series
}
// Seek implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
if c.cur == -1 {

View file

@ -215,7 +215,7 @@ func TestConcreteSeriesIterator(t *testing.T) {
{Value: 4, Timestamp: 4},
},
}
it := series.Iterator()
it := series.Iterator(nil)
// Seek to the first sample with ts=1.
require.Equal(t, chunkenc.ValFloat, it.Seek(1))

View file

@ -27,26 +27,31 @@ import (
type SeriesEntry struct {
Lset labels.Labels
SampleIteratorFn func() chunkenc.Iterator
SampleIteratorFn func(chunkenc.Iterator) chunkenc.Iterator
}
func (s *SeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *SeriesEntry) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() }
func (s *SeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return s.SampleIteratorFn(it) }
type ChunkSeriesEntry struct {
Lset labels.Labels
ChunkIteratorFn func() chunks.Iterator
ChunkIteratorFn func(chunks.Iterator) chunks.Iterator
}
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator() chunks.Iterator { return s.ChunkIteratorFn() }
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) }
// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
samplesS := Samples(samples(s))
return &SeriesEntry{
Lset: lset,
SampleIteratorFn: func() chunkenc.Iterator {
return NewListSeriesIterator(samples(s))
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
if lsi, ok := it.(*listSeriesIterator); ok {
lsi.Reset(samplesS)
return lsi
}
return NewListSeriesIterator(samplesS)
},
}
}
@ -56,11 +61,21 @@ func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkSeriesEntry {
return &ChunkSeriesEntry{
Lset: lset,
ChunkIteratorFn: func() chunks.Iterator {
chks := make([]chunks.Meta, 0, len(samples))
ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator {
lcsi, existing := it.(*listChunkSeriesIterator)
var chks []chunks.Meta
if existing {
chks = lcsi.chks[:0]
} else {
chks = make([]chunks.Meta, 0, len(samples))
}
for _, s := range samples {
chks = append(chks, tsdbutil.ChunkFromSamples(s))
}
if existing {
lcsi.Reset(chks...)
return lcsi
}
return NewListChunkSeriesIterator(chks...)
},
}
@ -87,6 +102,11 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
return &listSeriesIterator{samples: samples, idx: -1}
}
func (it *listSeriesIterator) Reset(samples Samples) {
it.samples = samples
it.idx = -1
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.samples.Get(it.idx)
return s.T(), s.V()
@ -150,6 +170,11 @@ func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator {
return &listChunkSeriesIterator{chks: chks, idx: -1}
}
func (it *listChunkSeriesIterator) Reset(chks ...chunks.Meta) {
it.chks = chks
it.idx = -1
}
func (it *listChunkSeriesIterator) At() chunks.Meta {
return it.chks[it.idx]
}
@ -164,6 +189,7 @@ func (it *listChunkSeriesIterator) Err() error { return nil }
type chunkSetToSeriesSet struct {
ChunkSeriesSet
iter chunks.Iterator
chkIterErr error
sameSeriesChunks []Series
}
@ -178,18 +204,18 @@ func (c *chunkSetToSeriesSet) Next() bool {
return false
}
iter := c.ChunkSeriesSet.At().Iterator()
c.iter = c.ChunkSeriesSet.At().Iterator(c.iter)
c.sameSeriesChunks = c.sameSeriesChunks[:0]
for iter.Next() {
for c.iter.Next() {
c.sameSeriesChunks = append(
c.sameSeriesChunks,
newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), iter.At()),
newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), c.iter.At()),
)
}
if iter.Err() != nil {
c.chkIterErr = iter.Err()
if c.iter.Err() != nil {
c.chkIterErr = c.iter.Err()
return false
}
return true
@ -210,9 +236,9 @@ func (c *chunkSetToSeriesSet) Err() error {
func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series {
return &SeriesEntry{
Lset: labels,
SampleIteratorFn: func() chunkenc.Iterator {
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
// TODO(bwplotka): Can we provide any chunkenc buffer?
return chk.Chunk.Iterator(nil)
return chk.Chunk.Iterator(it)
},
}
}
@ -252,7 +278,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries {
return &seriesToChunkEncoder{series}
}
func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
var (
chk chunkenc.Chunk
app chunkenc.Appender
@ -261,9 +287,14 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
mint := int64(math.MaxInt64)
maxt := int64(math.MinInt64)
chks := []chunks.Meta{}
var chks []chunks.Meta
lcsi, existing := it.(*listChunkSeriesIterator)
if existing {
chks = lcsi.chks[:0]
}
i := 0
seriesIter := s.Series.Iterator()
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
if typ != lastType || i >= seriesToChunkEncoderSplit {
@ -323,6 +354,10 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
})
}
if existing {
lcsi.Reset(chks...)
return lcsi
}
return NewListChunkSeriesIterator(chks...)
}

View file

@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) {
// Check chunk errors during iter time.
require.True(t, set.Next())
it := set.At().Iterator()
it := set.At().Iterator(nil)
require.Equal(t, chunkenc.ValNone, it.Next())
require.Equal(t, tc.iterErr.Error(), it.Err().Error())
})
@ -505,11 +505,12 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str
head, err := NewHead(nil, nil, w, nil, opts, nil)
require.NoError(tb, err)
var it chunkenc.Iterator
ctx := context.Background()
app := head.Appender(ctx)
for _, s := range series {
ref := storage.SeriesRef(0)
it := s.Iterator()
it = s.Iterator(it)
lset := s.Labels()
typ := it.Next()
lastTyp := typ
@ -550,11 +551,12 @@ func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series
oooSampleLabels := make([]labels.Labels, 0, len(series))
oooSamples := make([]tsdbutil.SampleSlice, 0, len(series))
var it chunkenc.Iterator
totalSamples := 0
app := head.Appender(context.Background())
for _, s := range series {
ref := storage.SeriesRef(0)
it := s.Iterator()
it = s.Iterator(it)
lset := s.Labels()
os := tsdbutil.SampleSlice{}
count := 0

View file

@ -746,8 +746,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
var (
ref = storage.SeriesRef(0)
chks []chunks.Meta
ref = storage.SeriesRef(0)
chks []chunks.Meta
chksIter chunks.Iterator
)
set := sets[0]
@ -765,7 +766,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
default:
}
s := set.At()
chksIter := s.Iterator()
chksIter = s.Iterator(chksIter)
chks = chks[:0]
for chksIter.Next() {
// We are not iterating in streaming way over chunk as

View file

@ -93,12 +93,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
require.NoError(t, q.Close())
}()
var it chunkenc.Iterator
result := map[string][]tsdbutil.Sample{}
for ss.Next() {
series := ss.At()
samples := []tsdbutil.Sample{}
it := series.Iterator()
it = series.Iterator(it)
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {
case chunkenc.ValFloat:
@ -133,12 +134,13 @@ func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Match
require.NoError(t, q.Close())
}()
var it chunks.Iterator
result := map[string][]chunks.Meta{}
for ss.Next() {
series := ss.At()
chks := []chunks.Meta{}
it := series.Iterator()
it = series.Iterator(it)
for it.Next() {
chks = append(chks, it.At())
}
@ -454,8 +456,8 @@ Outer:
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -628,9 +630,10 @@ func TestDB_Snapshot(t *testing.T) {
// sum values
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var series chunkenc.Iterator
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
_, v := series.At()
sum += v
@ -676,9 +679,10 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
// Sum values.
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var series chunkenc.Iterator
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
_, v := series.At()
sum += v
@ -770,8 +774,8 @@ Outer:
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -921,7 +925,7 @@ func TestDB_e2e(t *testing.T) {
for ss.Next() {
x := ss.At()
smpls, err := storage.ExpandSamples(x.Iterator(), newSample)
smpls, err := storage.ExpandSamples(x.Iterator(nil), newSample)
require.NoError(t, err)
if len(smpls) > 0 {
@ -1108,12 +1112,13 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore
set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
actualSeries := 0
var chunksIt chunks.Iterator
for set.Next() {
actualSeries++
actualChunks := 0
chunksIt := set.At().Iterator()
chunksIt = set.At().Iterator(chunksIt)
for chunksIt.Next() {
actualChunks++
}
@ -1205,8 +1210,8 @@ func TestTombstoneClean(t *testing.T) {
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -1479,11 +1484,12 @@ func TestSizeRetention(t *testing.T) {
// Add some data to the WAL.
headApp := db.Head().Appender(context.Background())
var aSeries labels.Labels
var it chunkenc.Iterator
for _, m := range headBlocks {
series := genSeries(100, 10, m.MinTime, m.MaxTime+1)
for _, s := range series {
aSeries = s.Labels()
it := s.Iterator()
it = s.Iterator(it)
for it.Next() == chunkenc.ValFloat {
tim, v := it.At()
_, err := headApp.Append(0, s.Labels(), tim, v)
@ -1691,10 +1697,11 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, storage.Warnings, error) {
resultLabels := []labels.Labels{}
resultSamples := map[string][]sample{}
var it chunkenc.Iterator
for ss.Next() {
series := ss.At()
samples := []sample{}
it := series.Iterator()
it = series.Iterator(it)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
@ -2500,10 +2507,11 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
// Sum the values.
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
var series chunkenc.Iterator
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
_, v := series.At()
sum += v
@ -2946,10 +2954,11 @@ func TestCompactHead(t *testing.T) {
defer func() { require.NoError(t, querier.Close()) }()
seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"})
var series chunkenc.Iterator
var actSamples []sample
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
time, val := series.At()
actSamples = append(actSamples, sample{int64(time), val, nil, nil})
@ -3347,7 +3356,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t
actualSeries++
// Get the iterator and call Next() so that we're sure the chunk is loaded.
it := seriesSet.At().Iterator()
it := seriesSet.At().Iterator(nil)
it.Next()
it.At()
@ -3477,11 +3486,13 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+"))
// Iterate all series and get their chunks.
var it chunks.Iterator
var chunks []chunkenc.Chunk
actualSeries := 0
for seriesSet.Next() {
actualSeries++
for it := seriesSet.At().Iterator(); it.Next(); {
it = seriesSet.At().Iterator(it)
for it.Next() {
chunks = append(chunks, it.At().Chunk)
}
}
@ -6025,13 +6036,14 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
ctx := context.Background()
var it chunkenc.Iterator
exp := make(map[string][]tsdbutil.Sample)
for _, series := range blockSeries {
createBlock(t, db.Dir(), series)
for _, s := range series {
key := s.Labels().String()
it := s.Iterator()
it = s.Iterator(it)
slice := exp[key]
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {

View file

@ -67,7 +67,7 @@ func Example() {
series := ss.At()
fmt.Println("series:", series.Labels().String())
it := series.Iterator()
it := series.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
_, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below)
fmt.Println("sample", v)

View file

@ -503,11 +503,7 @@ func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) {
}
func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(o.chunks))
for _, c := range o.chunks {
iterators = append(iterators, c.Chunk.Iterator(nil))
}
return storage.NewChainSampleIterator(iterators)
return storage.ChainSampleIteratorFromMetas(iterator, o.chunks)
}
func (o mergedOOOChunks) NumSamples() int {

View file

@ -924,8 +924,8 @@ func TestHeadDeleteSimple(t *testing.T) {
require.Equal(t, expSeries.Labels(), actSeries.Labels())
smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -959,7 +959,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, res.Next(), "series is not present")
s := res.At()
it := s.Iterator()
it := s.Iterator(nil)
require.Equal(t, chunkenc.ValNone, it.Next(), "expected no samples")
for res.Next() {
}
@ -976,7 +976,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, res.Next(), "series don't exist")
exps := res.At()
it = exps.Iterator()
it = exps.Iterator(nil)
resSamples, err := storage.ExpandSamples(it, newSample)
require.NoError(t, err)
require.Equal(t, []tsdbutil.Sample{sample{11, 1, nil, nil}}, resSamples)
@ -1163,7 +1163,7 @@ func TestDelete_e2e(t *testing.T) {
eok, rok := expSs.Next(), ss.Next()
// Skip a series if iterator is empty.
if rok {
for ss.At().Iterator().Next() == chunkenc.ValNone {
for ss.At().Iterator(nil).Next() == chunkenc.ValNone {
rok = ss.Next()
if !rok {
break
@ -1177,8 +1177,8 @@ func TestDelete_e2e(t *testing.T) {
sexp := expSs.At()
sres := ss.At()
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
}
@ -2635,7 +2635,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
<-time.After(3 * time.Second)
// Now consume after compaction when it's gone.
it := s.Iterator()
it := s.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
_, _ = it.At()
}
@ -2643,7 +2643,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
require.NoError(t, it.Err())
for ss.Next() {
s = ss.At()
it := s.Iterator()
it = s.Iterator(it)
for it.Next() == chunkenc.ValFloat {
_, _ = it.At()
}
@ -2841,7 +2841,7 @@ func TestAppendHistogram(t *testing.T) {
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator()
it := s.Iterator(nil)
actHistograms := make([]timedHistogram, 0, len(expHistograms))
for it.Next() == chunkenc.ValHistogram {
t, h := it.AtHistogram()
@ -3304,7 +3304,7 @@ func TestHistogramStaleSample(t *testing.T) {
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator()
it := s.Iterator(nil)
actHistograms := make([]timedHistogram, 0, len(expHistograms))
for it.Next() == chunkenc.ValHistogram {
t, h := it.AtHistogram()
@ -3581,7 +3581,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
it := s.Iterator()
it := s.Iterator(nil)
expIdx := 0
loop:
for {

View file

@ -426,6 +426,16 @@ func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]strin
return r.LabelNamesFor(postings...)
}
// seriesData, used inside other iterators, are updated when we move from one series to another.
type seriesData struct {
chks []chunks.Meta
intervals tombstones.Intervals
labels labels.Labels
}
// Labels implements part of storage.Series and storage.ChunkSeries.
func (s *seriesData) Labels() labels.Labels { return s.labels }
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
@ -438,8 +448,7 @@ type blockBaseSeriesSet struct {
mint, maxt int64
disableTrimming bool
currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels
curr seriesData
bufChks []chunks.Meta
bufLbls labels.Labels
@ -519,12 +528,11 @@ func (b *blockBaseSeriesSet) Next() bool {
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
}
b.currLabels = make(labels.Labels, len(b.bufLbls))
copy(b.currLabels, b.bufLbls)
b.curr.labels = make(labels.Labels, len(b.bufLbls))
copy(b.curr.labels, b.bufLbls)
b.curr.chks = chks
b.curr.intervals = intervals
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.blockID, b.chunks, chks, intervals)
}
return true
}
return false
@ -556,29 +564,26 @@ type populateWithDelGenericSeriesIterator struct {
// the same, single series.
chks []chunks.Meta
i int
i int // Index into chks; -1 if not started yet.
err error
bufIter *DeletedIterator
bufIter DeletedIterator // Retained for memory re-use. currDelIter may point here.
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
}
func newPopulateWithDelGenericSeriesIterator(
blockID ulid.ULID,
chunks ChunkReader,
chks []chunks.Meta,
intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{
blockID: blockID,
chunks: chunks,
chks: chks,
i: -1,
bufIter: &DeletedIterator{},
intervals: intervals,
}
func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.blockID = blockID
p.chunks = cr
p.chks = chks
p.i = -1
p.err = nil
p.bufIter.Iter = nil
p.bufIter.Intervals = p.bufIter.Intervals[:0]
p.intervals = intervals
p.currDelIter = nil
p.currChkMeta = chunks.Meta{}
}
func (p *populateWithDelGenericSeriesIterator) next() bool {
@ -618,28 +623,55 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
// We don't want the full chunk, or it's potentially still opened, take
// just a part of it.
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter)
p.currDelIter = &p.bufIter
return true
}
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
type blockSeriesEntry struct {
chunks ChunkReader
blockID ulid.ULID
seriesData
}
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
func (s *blockSeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
pi, ok := it.(*populateWithDelSeriesIterator)
if !ok {
pi = &populateWithDelSeriesIterator{}
}
pi.reset(s.blockID, s.chunks, s.chks, s.intervals)
return pi
}
type chunkSeriesEntry struct {
chunks ChunkReader
blockID ulid.ULID
seriesData
}
func (s *chunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator {
pi, ok := it.(*populateWithDelChunkSeriesIterator)
if !ok {
pi = &populateWithDelChunkSeriesIterator{}
}
pi.reset(s.blockID, s.chunks, s.chks, s.intervals)
return pi
}
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
*populateWithDelGenericSeriesIterator
populateWithDelGenericSeriesIterator
curr chunkenc.Iterator
}
func (p *populateWithDelSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals)
p.curr = nil
}
func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType {
if p.curr != nil {
if valueType := p.curr.Next(); valueType != chunkenc.ValNone {
@ -701,11 +733,16 @@ func (p *populateWithDelSeriesIterator) Err() error {
}
type populateWithDelChunkSeriesIterator struct {
*populateWithDelGenericSeriesIterator
populateWithDelGenericSeriesIterator
curr chunks.Meta
}
func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals)
p.curr = chunks.Meta{}
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
return false
@ -834,13 +871,11 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde
}
func (b *blockSeriesSet) At() storage.Series {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.SeriesEntry{
Lset: b.currLabels,
SampleIteratorFn: func() chunkenc.Iterator {
return currIterFn().toSeriesIterator()
},
// At can be looped over before iterating, so save the current values locally.
return &blockSeriesEntry{
chunks: b.chunks,
blockID: b.blockID,
seriesData: b.curr,
}
}
@ -868,13 +903,11 @@ func newBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombst
}
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.ChunkSeriesEntry{
Lset: b.currLabels,
ChunkIteratorFn: func() chunks.Iterator {
return currIterFn().toChunkSeriesIterator()
},
// At can be looped over before iterating, so save the current values locally.
return &chunkSeriesEntry{
chunks: b.chunks,
blockID: b.blockID,
seriesData: b.curr,
}
}

View file

@ -194,8 +194,8 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -230,9 +230,9 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
require.Equal(t, sexpChks.Labels(), sres.Labels())
chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator())
chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator(nil))
rmChunkRefs(chksExp)
chksRes, errRes := storage.ExpandChunks(sres.Iterator())
chksRes, errRes := storage.ExpandChunks(sres.Iterator(nil))
rmChunkRefs(chksRes)
require.Equal(t, errExp, errRes)
require.Equal(t, chksExp, chksRes)
@ -859,7 +859,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Run("sample", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tc.intervals)
var r []tsdbutil.Sample
if tc.seek != 0 {
@ -879,7 +880,8 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
})
t.Run("chunk", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, tc.intervals).toChunkSeriesIterator()
it := &populateWithDelChunkSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tc.intervals)
if tc.seek != 0 {
// Chunk iterator does not have Seek method.
@ -911,7 +913,8 @@ func TestPopulateWithDelSeriesIterator_DoubleSeek(t *testing.T) {
[]tsdbutil.Sample{sample{4, 4, nil, nil}, sample{5, 5, nil, nil}},
)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, nil)
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
require.Equal(t, chunkenc.ValFloat, it.Seek(2))
require.Equal(t, chunkenc.ValFloat, it.Seek(2))
@ -929,7 +932,8 @@ func TestPopulateWithDelSeriesIterator_SeekInCurrentChunk(t *testing.T) {
[]tsdbutil.Sample{},
)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, nil)
require.Equal(t, chunkenc.ValFloat, it.Next())
ts, v := it.At()
require.Equal(t, int64(1), ts)
@ -946,7 +950,8 @@ func TestPopulateWithDelSeriesIterator_SeekWithMinTime(t *testing.T) {
[]tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{6, 8, nil, nil}},
)
it := newPopulateWithDelGenericSeriesIterator(ulid.ULID{}, f, chkMetas, nil).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, nil)
require.Equal(t, chunkenc.ValNone, it.Seek(7))
require.Equal(t, chunkenc.ValFloat, it.Seek(3))
}
@ -958,9 +963,8 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) {
[]tsdbutil.Sample{sample{1, 6, nil, nil}, sample{5, 6, nil, nil}, sample{7, 8, nil, nil}},
)
it := newPopulateWithDelGenericSeriesIterator(
ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}),
).toSeriesIterator()
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tombstones.Intervals{{Mint: math.MinInt64, Maxt: 2}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}))
require.Equal(t, chunkenc.ValNone, it.Next())
}
@ -1433,9 +1437,10 @@ func BenchmarkQuerySeek(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
var it chunkenc.Iterator
ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
for ss.Next() {
it := ss.At().Iterator()
it = ss.At().Iterator(it)
for t := mint; t <= maxt; t++ {
it.Seek(t)
}
@ -2042,11 +2047,13 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
for i := 0; i < b.N; i++ {
ss := q.Select(false, nil, selectors...)
var actualExpansions int
var it chunkenc.Iterator
for ss.Next() {
s := ss.At()
s.Labels()
it := s.Iterator()
it = s.Iterator(it)
for it.Next() != chunkenc.ValNone {
_, _ = it.At()
}
actualExpansions++
}
@ -2222,11 +2229,12 @@ func TestBlockBaseSeriesSet(t *testing.T) {
i := 0
for bcs.Next() {
chks := bcs.currIterFn().chks
si := populateWithDelGenericSeriesIterator{}
si.reset(bcs.blockID, bcs.chunks, bcs.curr.chks, bcs.curr.intervals)
idx := tc.expIdxs[i]
require.Equal(t, tc.series[idx].lset, bcs.currLabels)
require.Equal(t, tc.series[idx].chunks, chks)
require.Equal(t, tc.series[idx].lset, bcs.curr.labels)
require.Equal(t, tc.series[idx].chunks, si.chks)
i++
}

View file

@ -49,10 +49,11 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l
const commitAfter = 10000
ctx := context.Background()
app := w.Appender(ctx)
var it chunkenc.Iterator
for _, s := range series {
ref := storage.SeriesRef(0)
it := s.Iterator()
it = s.Iterator(it)
lset := s.Labels()
typ := it.Next()
lastTyp := typ

View file

@ -102,12 +102,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
var chkIter chunkenc.Iterator
for set.Next() {
s := set.At()
// TODO(fabxc): allow fast path for most recent sample either
// in the storage itself or caching layer in Prometheus.
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
var t int64
var v float64