diff --git a/tsdb/block.go b/tsdb/block.go index c55e22ce5..2f32733f8 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -467,11 +467,6 @@ func (pb *Block) setCompactionFailed() error { return nil } -// Querier implements Queryable. -func (pb *Block) Querier(mint, maxt int64) (storage.Querier, error) { - return NewBlockQuerier(pb, mint, maxt) -} - type blockIndexReader struct { ir IndexReader b *Block diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 977d6b978..47f12df99 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -199,19 +199,18 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB defer s.Unlock() *chks = (*chks)[:0] - - getSeriesChunks(s, h.mint, h.maxt, chks) + *chks = appendSeriesChunks(s, h.mint, h.maxt, *chks) return nil } -func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) { +func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. if !c.OverlapsClosedInterval(mint, maxt) { continue } - *chks = append(*chks, chunks.Meta{ + chks = append(chks, chunks.Meta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))), @@ -230,7 +229,7 @@ func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) { maxTime = chk.maxTime } if chk.OverlapsClosedInterval(mint, maxt) { - *chks = append(*chks, chunks.Meta{ + chks = append(chks, chunks.Meta{ MinTime: chk.minTime, MaxTime: maxTime, Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))), @@ -239,6 +238,7 @@ func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) { j++ } } + return chks } // headChunkID returns the HeadChunkID referred to by the given position. @@ -259,7 +259,7 @@ func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask } -func unpackHeadChunkRef(ref chunks.ChunkRef) (chunks.HeadSeriesRef, chunks.HeadChunkID, bool) { +func unpackHeadChunkRef(ref chunks.ChunkRef) (seriesID chunks.HeadSeriesRef, chunkID chunks.HeadChunkID, isOOO bool) { sid, cid := chunks.HeadChunkRef(ref).Unpack() return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0 } @@ -481,14 +481,14 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi return elem, true, offset == 0, nil } -// oooMergedChunks return an iterable over one or more OOO chunks for the given +// mergedChunks return an iterable over one or more OOO chunks for the given // chunks.Meta reference from memory or by m-mapping it from the disk. The // returned iterable will be a merge of all the overlapping chunks, if any, // amongst all the chunks in the OOOHead. // If hr is non-nil then in-order chunks are included. // This function is not thread safe unless the caller holds a lock. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) { +func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) { _, cid, _ := unpackHeadChunkRef(meta.Ref) // ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are @@ -531,8 +531,7 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe } if hr != nil { // Include in-order chunks. - var metas []chunks.Meta - getSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), &metas) + metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil) for _, m := range metas { tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ meta: m, diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index aaaa24963..47e2efb86 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S if s.ooo != nil { return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks) } - getSeriesChunks(s, oh.mint, oh.maxt, chks) + *chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks) return nil } @@ -127,7 +127,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap } if includeInOrder { - getSeriesChunks(s, mint, maxt, &tmpChks) + tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks) } // There is nothing to do if we did not collect any chunk. @@ -253,13 +253,14 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu } s.Lock() - mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) + mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) s.Unlock() return nil, mc, err } -// Pass through special behaviour for current head chunk. +// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour +// is only implemented for the in-order head chunk. func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) { _, _, isOOO := unpackHeadChunkRef(meta.Ref) if !isOOO { diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index e3e457d07..43accc253 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -321,10 +321,17 @@ func BenchmarkQuerierSelect(b *testing.B) { require.NoError(b, block.Close()) }() - benchmarkSelect(b, block, numSeries, false) + benchmarkSelect(b, (*queryableBlock)(block), numSeries, false) }) } +// Type wrapper to let a Block be a Queryable in benchmarkSelect(). +type queryableBlock Block + +func (pb *queryableBlock) Querier(mint, maxt int64) (storage.Querier, error) { + return NewBlockQuerier((*Block)(pb), mint, maxt) +} + func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) { numSeries := 1000000 _, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {