diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index dd45a9ced1..59e33111e4 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -29,8 +29,6 @@ import ( "github.com/prometheus/prometheus/tsdb/tombstones" ) -var _ IndexReader = &OOOHeadIndexReader{} - // OOOHeadIndexReader implements IndexReader so ooo samples in the head can be // accessed. // It also has a reference to headIndexReader so we can leverage on its @@ -42,8 +40,6 @@ type OOOHeadIndexReader struct { lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef } -var _ chunkenc.Iterable = &mergedOOOChunks{} - // mergedOOOChunks holds the list of iterables for overlapping chunks. type mergedOOOChunks struct { chunkIterables []chunkenc.Iterable @@ -66,6 +62,53 @@ func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.Scra return oh.series(ref, builder, chks, oh.lastGarbageCollectedMmapRef, 0) } +type MultiChunk struct { + chunks []chunkenc.Chunk +} + +func (c MultiChunk) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + switch len(c.chunks) { + case 0: + return chunkenc.NewNopIterator() + case 1: + return c.chunks[0].Iterator(it) + default: + iterators := make([]chunkenc.Iterator, 0, len(c.chunks)) + for _, chk := range c.chunks { + iterators = append(iterators, chk.Iterator(nil)) + } + return storage.ChainSampleIteratorFromIterators(it, iterators) + } +} + +func (c MultiChunk) Appender() (chunkenc.Appender, error) { + return nil, errors.New("not implemented") +} + +func (c MultiChunk) Bytes() []byte { + return nil +} + +func (c MultiChunk) Compact() { + // no-op +} + +func (c MultiChunk) Encoding() chunkenc.Encoding { + return chunkenc.EncNone +} + +func (c MultiChunk) NumSamples() int { + sum := 0 + for _, chk := range c.chunks { + sum += chk.NumSamples() + } + return sum +} + +func (c MultiChunk) Reset([]byte) { + // no-op +} + // lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so // any chunk at or before this ref will not be considered. 0 disables this check. // @@ -109,14 +152,16 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. + headChunks := MultiChunk{} chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) if err != nil { handleChunkWriteError(err) return nil } for _, chk := range chks { - addChunk(chk.minTime, chk.maxTime, ref, chk.chunk) + headChunks.chunks = append(headChunks.chunks, chk.chunk) } + addChunk(c.minTime, c.maxTime, ref, headChunks) } else { var emptyChunk chunkenc.Chunk addChunk(c.minTime, c.maxTime, ref, emptyChunk) diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index ec73a59f8b..f8ce8ecb82 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -31,6 +31,13 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" ) +// Type assertions. +var ( + _ chunkenc.Chunk = &MultiChunk{} + _ chunkenc.Iterable = &mergedOOOChunks{} + _ IndexReader = &OOOHeadIndexReader{} +) + type chunkInterval struct { // because we permutate the order of chunks, we cannot determine at test declaration time which chunkRefs we expect in the Output. // This ID matches expected output chunks against test input chunks, the test runner will assert the chunkRef for the matching chunk