Change ChunkReader.Chunk() to ChunkOrIterable()

The ChunkReader interface's Chunk() has been changed to ChunkOrIterable(). 

This is a precursor to OOO native histogram support - with OOO native histograms, the chunks.Meta passed to Chunk() can result in multiple chunks being returned rather than just a single chunk (e.g. if oooMergedChunk has a counter reset in the middle). 

To support this, ChunkOrIterable() requires either a single chunk or an iterable to be returned. If an iterable is returned, the caller has the responsibility of converting the samples from the iterable into possibly multiple chunks. The OOOHeadChunkReader now returns an iterable rather than a chunk to prepare for the native histograms case. Also as a beneficial side effect, oooMergedChunk and boundedChunk has been simplified as they only need to implement the Iterable interface now, not the full Chunk interface.

---------

Signed-off-by: Fiona Liao <fiona.y.liao@gmail.com>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
This commit is contained in:
Fiona Liao 2023-11-28 10:14:29 +00:00 committed by GitHub
parent ecc37588b0
commit 5bee0cfce2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 819 additions and 221 deletions

View file

@ -16,6 +16,7 @@ package main
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
@ -643,10 +644,15 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
for _, chk := range chks {
// Load the actual data of the chunk.
chk, err := chunkr.Chunk(chk)
chk, iterable, err := chunkr.ChunkOrIterable(chk)
if err != nil {
return err
}
// Chunks within blocks should not need to be re-written, so an
// iterable is not expected to be returned from the chunk reader.
if iterable != nil {
return errors.New("ChunkOrIterable should not return an iterable when reading a block")
}
switch chk.Encoding() {
case chunkenc.EncXOR:
floatChunkSamplesCount = append(floatChunkSamplesCount, chk.NumSamples())

View file

@ -473,10 +473,10 @@ func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunke
return csi
}
func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(chunks))
for i, c := range chunks {
csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i])
func ChainSampleIteratorFromIterables(it chunkenc.Iterator, iterables []chunkenc.Iterable) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(iterables))
for i, c := range iterables {
csi.iterators[i] = c.Iterator(csi.iterators[i])
}
return csi
}

View file

@ -117,8 +117,19 @@ type ChunkWriter interface {
// ChunkReader provides reading access of serialized time series data.
type ChunkReader interface {
// Chunk returns the series data chunk with the given reference.
Chunk(meta chunks.Meta) (chunkenc.Chunk, error)
// ChunkOrIterable returns the series data for the given chunks.Meta.
// Either a single chunk will be returned, or an iterable.
// A single chunk should be returned if chunks.Meta maps to a chunk that
// already exists and doesn't need modifications.
// An iterable should be returned if chunks.Meta maps to a subset of the
// samples in a stored chunk, or multiple chunks. (E.g. OOOHeadChunkReader
// could return an iterable where multiple histogram samples have counter
// resets. There can only be one counter reset per histogram chunk so
// multiple chunks would be created from the iterable in this case.)
// Only one of chunk or iterable should be returned. In some cases you may
// always expect a chunk to be returned. You can check that iterable is nil
// in those cases.
ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error)
// Close releases all underlying resources of the reader.
Close() error

View file

@ -501,6 +501,19 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
return filepath.Join(dir, ulid.String())
}
func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
require.NoError(tb, err)
require.NoError(tb, os.MkdirAll(dir, 0o777))
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
require.NoError(tb, err)
return filepath.Join(dir, ulid.String())
}
func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head {
opts := DefaultHeadOptions()
opts.ChunkDirRoot = chunkDir

View file

@ -67,6 +67,8 @@ const (
// Chunk holds a sequence of sample pairs that can be iterated over and appended to.
type Chunk interface {
Iterable
// Bytes returns the underlying byte slice of the chunk.
Bytes() []byte
@ -76,11 +78,6 @@ type Chunk interface {
// Appender returns an appender to append samples to the chunk.
Appender() (Appender, error)
// The iterator passed as argument is for re-use.
// Depending on implementation, the iterator can
// be re-used or a new iterator can be allocated.
Iterator(Iterator) Iterator
// NumSamples returns the number of samples in the chunk.
NumSamples() int
@ -92,6 +89,13 @@ type Chunk interface {
Compact()
}
type Iterable interface {
// The iterator passed as argument is for re-use.
// Depending on implementation, the iterator can
// be re-used or a new iterator can be allocated.
Iterator(Iterator) Iterator
}
// Appender adds sample pairs to a chunk.
type Appender interface {
Append(int64, float64)
@ -184,6 +188,19 @@ func (v ValueType) ChunkEncoding() Encoding {
}
}
func (v ValueType) NewChunk() (Chunk, error) {
switch v {
case ValFloat:
return NewXORChunk(), nil
case ValHistogram:
return NewHistogramChunk(), nil
case ValFloatHistogram:
return NewFloatHistogramChunk(), nil
default:
return nil, fmt.Errorf("value type %v unsupported", v)
}
}
// MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values.
func MockSeriesIterator(timestamps []int64, values []float64) Iterator {
return &mockSeriesIterator{

View file

@ -117,11 +117,16 @@ func (b BlockChunkRef) Unpack() (int, int) {
return sgmIndex, chkStart
}
// Meta holds information about a chunk of data.
// Meta holds information about one or more chunks.
// For examples of when chunks.Meta could refer to multiple chunks, see
// ChunkReader.ChunkOrIterable().
type Meta struct {
// Ref and Chunk hold either a reference that can be used to retrieve
// chunk data or the data itself.
// If Chunk is nil, call ChunkReader.Chunk(Meta.Ref) to get the chunk and assign it to the Chunk field
// If Chunk is nil, call ChunkReader.ChunkOrIterable(Meta.Ref) to get the
// chunk and assign it to the Chunk field. If an iterable is returned from
// that method, then it may not be possible to set Chunk as the iterable
// might form several chunks.
Ref ChunkRef
Chunk chunkenc.Chunk
@ -667,24 +672,24 @@ func (s *Reader) Size() int64 {
}
// Chunk returns a chunk from a given reference.
func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) {
func (s *Reader) ChunkOrIterable(meta Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sgmIndex, chkStart := BlockChunkRef(meta.Ref).Unpack()
if sgmIndex >= len(s.bs) {
return nil, fmt.Errorf("segment index %d out of range", sgmIndex)
return nil, nil, fmt.Errorf("segment index %d out of range", sgmIndex)
}
sgmBytes := s.bs[sgmIndex]
if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() {
return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len())
return nil, nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len())
}
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize)
chkDataLen, n := binary.Uvarint(c)
if n <= 0 {
return nil, fmt.Errorf("reading chunk length failed with %d", n)
return nil, nil, fmt.Errorf("reading chunk length failed with %d", n)
}
chkEncStart := chkStart + n
@ -693,17 +698,18 @@ func (s *Reader) Chunk(meta Meta) (chunkenc.Chunk, error) {
chkDataEnd := chkEnd - crc32.Size
if chkEnd > sgmBytes.Len() {
return nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
return nil, nil, fmt.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
}
sum := sgmBytes.Range(chkDataEnd, chkEnd)
if err := checkCRC32(sgmBytes.Range(chkEncStart, chkDataEnd), sum); err != nil {
return nil, err
return nil, nil, err
}
chkData := sgmBytes.Range(chkDataStart, chkDataEnd)
chkEnc := sgmBytes.Range(chkEncStart, chkEncStart+ChunkEncodingSize)[0]
return s.pool.Get(chunkenc.Encoding(chkEnc), chkData)
chk, err := s.pool.Get(chunkenc.Encoding(chkEnc), chkData)
return chk, nil, err
}
func nextSequenceFile(dir string) (string, int, error) {

View file

@ -23,6 +23,6 @@ func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
r := &Reader{bs: []ByteSlice{b}}
_, err := r.Chunk(Meta{Ref: 0})
_, _, err := r.ChunkOrIterable(Meta{Ref: 0})
require.Error(t, err)
}

View file

@ -1144,6 +1144,46 @@ func BenchmarkCompactionFromHead(b *testing.B) {
}
}
func BenchmarkCompactionFromOOOHead(b *testing.B) {
dir := b.TempDir()
totalSeries := 100000
totalSamples := 100
for labelNames := 1; labelNames < totalSeries; labelNames *= 10 {
labelValues := totalSeries / labelNames
b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) {
chunkDir := b.TempDir()
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = chunkDir
opts.OutOfOrderTimeWindow.Store(int64(totalSamples))
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender(context.Background())
for lv := 0; lv < labelValues; lv++ {
lbls := labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln))
_, err = app.Append(0, lbls, int64(totalSamples), 0)
require.NoError(b, err)
for ts := 0; ts < totalSamples; ts++ {
_, err = app.Append(0, lbls, int64(ts), float64(ts))
require.NoError(b, err)
}
}
require.NoError(b, app.Commit())
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
oooHead, err := NewOOOCompactionHead(context.TODO(), h)
require.NoError(b, err)
createBlockFromOOOHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), oooHead)
}
h.Close()
})
}
}
// TestDisableAutoCompactions checks that we can
// disable and enable the auto compaction.
// This is needed for unit tests that rely on

View file

@ -2909,8 +2909,9 @@ func TestChunkWriter_ReadAfterWrite(t *testing.T) {
for _, chks := range test.chks {
for _, chkExp := range chks {
chkAct, err := r.Chunk(chkExp)
chkAct, iterable, err := r.ChunkOrIterable(chkExp)
require.NoError(t, err)
require.Nil(t, iterable)
require.Equal(t, chkExp.Chunk.Bytes(), chkAct.Bytes())
}
}
@ -2969,8 +2970,9 @@ func TestChunkReader_ConcurrentReads(t *testing.T) {
go func(chunk chunks.Meta) {
defer wg.Done()
chkAct, err := r.Chunk(chunk)
chkAct, iterable, err := r.ChunkOrIterable(chunk)
require.NoError(t, err)
require.Nil(t, iterable)
require.Equal(t, chunk.Chunk.Bytes(), chkAct.Bytes())
}(chk)
}

View file

@ -289,10 +289,10 @@ func (h *headChunkReader) Close() error {
return nil
}
// Chunk returns the chunk for the reference number.
func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
// ChunkOrIterable returns the chunk for the reference number.
func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
chk, _, err := h.chunk(meta, false)
return chk, err
return chk, nil, err
}
// ChunkWithCopy returns the chunk for the reference number.
@ -416,13 +416,13 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
return elem, true, offset == 0, nil
}
// oooMergedChunk returns the requested chunk based on the given chunks.Meta
// reference from memory or by m-mapping it from the disk. The returned chunk
// might be a merge of all the overlapping chunks, if any, amongst all the
// chunks in the OOOHead.
// oooMergedChunks return an iterable over one or more OOO chunks for the given
// chunks.Meta reference from memory or by m-mapping it from the disk. The
// returned iterable will be a merge of all the overlapping chunks, if any,
// amongst all the chunks in the OOOHead.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (chunk *mergedOOOChunks, err error) {
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (*mergedOOOChunks, error) {
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
@ -499,11 +499,13 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
mc := &mergedOOOChunks{}
absoluteMax := int64(math.MinInt64)
for _, c := range tmpChks {
if c.meta.Ref != meta.Ref && (len(mc.chunks) == 0 || c.meta.MinTime > absoluteMax) {
if c.meta.Ref != meta.Ref && (len(mc.chunkIterables) == 0 || c.meta.MinTime > absoluteMax) {
continue
}
var iterable chunkenc.Iterable
if c.meta.Ref == oooHeadRef {
var xor *chunkenc.XORChunk
var err error
// If head chunk min and max time match the meta OOO markers
// that means that the chunk has not expanded so we can append
// it as it is.
@ -516,7 +518,7 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
if err != nil {
return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk")
}
c.meta.Chunk = xor
iterable = xor
} else {
chk, err := cdm.Chunk(c.ref)
if err != nil {
@ -531,12 +533,12 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
// wrap the chunk within a chunk that doesnt allows us to iterate
// through samples out of the OOOLastMinT and OOOLastMaxT
// markers.
c.meta.Chunk = boundedChunk{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime}
iterable = boundedIterable{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime}
} else {
c.meta.Chunk = chk
iterable = chk
}
}
mc.chunks = append(mc.chunks, c.meta)
mc.chunkIterables = append(mc.chunkIterables, iterable)
if c.meta.MaxTime > absoluteMax {
absoluteMax = c.meta.MaxTime
}
@ -545,77 +547,30 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
return mc, nil
}
var _ chunkenc.Chunk = &mergedOOOChunks{}
var _ chunkenc.Iterable = &mergedOOOChunks{}
// mergedOOOChunks holds the list of overlapping chunks. This struct satisfies
// chunkenc.Chunk.
// mergedOOOChunks holds the list of iterables for overlapping chunks.
type mergedOOOChunks struct {
chunks []chunks.Meta
}
// Bytes is a very expensive method because its calling the iterator of all the
// chunks in the mergedOOOChunk and building a new chunk with the samples.
func (o mergedOOOChunks) Bytes() []byte {
xc := chunkenc.NewXORChunk()
app, err := xc.Appender()
if err != nil {
panic(err)
}
it := o.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
app.Append(t, v)
}
return xc.Bytes()
}
func (o mergedOOOChunks) Encoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) {
return nil, errors.New("can't append to mergedOOOChunks")
chunkIterables []chunkenc.Iterable
}
func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
return storage.ChainSampleIteratorFromMetas(iterator, o.chunks)
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
}
func (o mergedOOOChunks) NumSamples() int {
samples := 0
for _, c := range o.chunks {
samples += c.Chunk.NumSamples()
}
return samples
}
var _ chunkenc.Iterable = &boundedIterable{}
func (o mergedOOOChunks) Compact() {}
var _ chunkenc.Chunk = &boundedChunk{}
// boundedChunk is an implementation of chunkenc.Chunk that uses a
// boundedIterable is an implementation of chunkenc.Iterable that uses a
// boundedIterator that only iterates through samples which timestamps are
// >= minT and <= maxT.
type boundedChunk struct {
chunkenc.Chunk
minT int64
maxT int64
type boundedIterable struct {
chunk chunkenc.Chunk
minT int64
maxT int64
}
func (b boundedChunk) Bytes() []byte {
xor := chunkenc.NewXORChunk()
a, _ := xor.Appender()
it := b.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
a.Append(t, v)
}
return xor.Bytes()
}
func (b boundedChunk) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
it := b.Chunk.Iterator(iterator)
func (b boundedIterable) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
it := b.chunk.Iterator(iterator)
if it == nil {
panic("iterator shouldn't be nil")
}

View file

@ -129,21 +129,10 @@ func TestBoundedChunk(t *testing.T) {
}
for _, tc := range tests {
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) {
chunk := boundedChunk{tc.inputChunk, tc.inputMinT, tc.inputMaxT}
// Testing Bytes()
expChunk := chunkenc.NewXORChunk()
if tc.inputChunk.NumSamples() > 0 {
app, err := expChunk.Appender()
require.NoError(t, err)
for ts := tc.inputMinT; ts <= tc.inputMaxT; ts++ {
app.Append(ts, float64(ts))
}
}
require.Equal(t, expChunk.Bytes(), chunk.Bytes())
iterable := boundedIterable{tc.inputChunk, tc.inputMinT, tc.inputMaxT}
var samples []sample
it := chunk.Iterator(nil)
it := iterable.Iterator(nil)
if tc.initialSeek != 0 {
// Testing Seek()

View file

@ -1835,16 +1835,16 @@ func TestGCChunkAccess(t *testing.T) {
cr, err := h.chunksRange(0, 1500, nil)
require.NoError(t, err)
_, err = cr.Chunk(chunks[0])
_, _, err = cr.ChunkOrIterable(chunks[0])
require.NoError(t, err)
_, err = cr.Chunk(chunks[1])
_, _, err = cr.ChunkOrIterable(chunks[1])
require.NoError(t, err)
require.NoError(t, h.Truncate(1500)) // Remove a chunk.
_, err = cr.Chunk(chunks[0])
_, _, err = cr.ChunkOrIterable(chunks[0])
require.Equal(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1])
_, _, err = cr.ChunkOrIterable(chunks[1])
require.NoError(t, err)
}
@ -1894,18 +1894,18 @@ func TestGCSeriesAccess(t *testing.T) {
cr, err := h.chunksRange(0, 2000, nil)
require.NoError(t, err)
_, err = cr.Chunk(chunks[0])
_, _, err = cr.ChunkOrIterable(chunks[0])
require.NoError(t, err)
_, err = cr.Chunk(chunks[1])
_, _, err = cr.ChunkOrIterable(chunks[1])
require.NoError(t, err)
require.NoError(t, h.Truncate(2000)) // Remove the series.
require.Equal(t, (*memSeries)(nil), h.series.getByID(1))
_, err = cr.Chunk(chunks[0])
_, _, err = cr.ChunkOrIterable(chunks[0])
require.Equal(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1])
_, _, err = cr.ChunkOrIterable(chunks[1])
require.Equal(t, storage.ErrNotFound, err)
}
@ -5406,8 +5406,9 @@ func TestCuttingNewHeadChunks(t *testing.T) {
require.Len(t, chkMetas, len(tc.expectedChks))
for i, expected := range tc.expectedChks {
chk, err := chkReader.Chunk(chkMetas[i])
chk, iterable, err := chkReader.ChunkOrIterable(chkMetas[i])
require.NoError(t, err)
require.Nil(t, iterable)
require.Equal(t, expected.numSamples, chk.NumSamples())
require.Len(t, chk.Bytes(), expected.numBytes)
@ -5455,8 +5456,9 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
storedSampleCount := 0
for _, chunkMeta := range chunks {
chunk, err := chunkReader.Chunk(chunkMeta)
chunk, iterable, err := chunkReader.ChunkOrIterable(chunkMeta)
require.NoError(t, err)
require.Nil(t, iterable)
storedSampleCount += chunk.NumSamples()
}

View file

@ -247,33 +247,33 @@ func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationS
}
}
func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack()
s := cr.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, storage.ErrNotFound
return nil, nil, storage.ErrNotFound
}
s.Lock()
if s.ooo == nil {
// There is no OOO data for this series.
s.Unlock()
return nil, storage.ErrNotFound
return nil, nil, storage.ErrNotFound
}
c, err := s.oooMergedChunk(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt)
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt)
s.Unlock()
if err != nil {
return nil, err
return nil, nil, err
}
// This means that the query range did not overlap with the requested chunk.
if len(c.chunks) == 0 {
return nil, storage.ErrNotFound
if len(mc.chunkIterables) == 0 {
return nil, nil, storage.ErrNotFound
}
return c, nil
return nil, mc, nil
}
func (cr OOOHeadChunkReader) Close() error {

View file

@ -486,9 +486,10 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
cr := NewOOOHeadChunkReader(db.head, 0, 1000, nil)
defer cr.Close()
c, err := cr.Chunk(chunks.Meta{
c, iterable, err := cr.ChunkOrIterable(chunks.Meta{
Ref: 0x1000000, Chunk: chunkenc.Chunk(nil), MinTime: 100, MaxTime: 300,
})
require.Nil(t, iterable)
require.Equal(t, err, fmt.Errorf("not found"))
require.Equal(t, c, nil)
})
@ -853,11 +854,12 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, err := cr.Chunk(chks[i])
c, iterable, err := cr.ChunkOrIterable(chks[i])
require.NoError(t, err)
require.Nil(t, c)
var resultSamples chunks.SampleSlice
it := c.Iterator(nil)
it := iterable.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
resultSamples = append(resultSamples, sample{t: t, f: v})
@ -1025,11 +1027,12 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
cr := NewOOOHeadChunkReader(db.head, tc.queryMinT, tc.queryMaxT, nil)
defer cr.Close()
for i := 0; i < len(chks); i++ {
c, err := cr.Chunk(chks[i])
c, iterable, err := cr.ChunkOrIterable(chks[i])
require.NoError(t, err)
require.Nil(t, c)
var resultSamples chunks.SampleSlice
it := c.Iterator(nil)
it := iterable.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
ts, v := it.At()
resultSamples = append(resultSamples, sample{t: ts, f: v})

View file

@ -632,36 +632,42 @@ func (b *blockBaseSeriesSet) Warnings() annotations.Annotations { return nil }
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully
// removed by intervals are filtered out in previous phase.
//
// On each iteration currChkMeta is available. If currDelIter is not nil, it
// means that the chunk iterator in currChkMeta is invalid and a chunk rewrite
// is needed, for which currDelIter should be used.
// On each iteration currMeta is available. If currDelIter is not nil, it
// means that the chunk in currMeta is invalid and a chunk rewrite is needed,
// for which currDelIter should be used.
type populateWithDelGenericSeriesIterator struct {
blockID ulid.ULID
chunks ChunkReader
// chks are expected to be sorted by minTime and should be related to
cr ChunkReader
// metas are expected to be sorted by minTime and should be related to
// the same, single series.
chks []chunks.Meta
// It's possible for a single chunks.Meta to refer to multiple chunks.
// cr.ChunkOrIterator() would return an iterable and a nil chunk in this
// case.
metas []chunks.Meta
i int // Index into chks; -1 if not started yet.
i int // Index into metas; -1 if not started yet.
err error
bufIter DeletedIterator // Retained for memory re-use. currDelIter may point here.
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
// currMeta is the current chunks.Meta from metas. currMeta.Chunk is set to
// the chunk returned from cr.ChunkOrIterable(). As that can return a nil
// chunk, currMeta.Chunk is not always guaranteed to be set.
currMeta chunks.Meta
}
func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.blockID = blockID
p.chunks = cr
p.chks = chks
p.cr = cr
p.metas = chks
p.i = -1
p.err = nil
// Note we don't touch p.bufIter.Iter; it is holding on to an iterator we might reuse in next().
p.bufIter.Intervals = p.bufIter.Intervals[:0]
p.intervals = intervals
p.currDelIter = nil
p.currChkMeta = chunks.Meta{}
p.currMeta = chunks.Meta{}
}
// If copyHeadChunk is true, then the head chunk (i.e. the in-memory chunk of the TSDB)
@ -669,43 +675,54 @@ func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr Chunk
// However, if the deletion intervals overlaps with the head chunk, then the head chunk is
// not copied irrespective of copyHeadChunk because it will be re-encoded later anyway.
func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool {
if p.err != nil || p.i >= len(p.chks)-1 {
if p.err != nil || p.i >= len(p.metas)-1 {
return false
}
p.i++
p.currChkMeta = p.chks[p.i]
p.currMeta = p.metas[p.i]
p.bufIter.Intervals = p.bufIter.Intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
if p.currMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
p.bufIter.Intervals = p.bufIter.Intervals.Add(interval)
}
}
hcr, ok := p.chunks.(*headChunkReader)
hcr, ok := p.cr.(*headChunkReader)
var iterable chunkenc.Iterable
if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 {
// ChunkWithCopy will copy the head chunk.
var maxt int64
p.currChkMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currChkMeta)
p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta)
// For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here.
p.currChkMeta.MaxTime = maxt
p.currMeta.MaxTime = maxt
} else {
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta)
p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta)
}
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currChkMeta.Ref, p.blockID.String())
p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currMeta.Ref, p.blockID.String())
return false
}
if len(p.bufIter.Intervals) == 0 {
// If there is no overlap with deletion intervals, we can take chunk as it is.
p.currDelIter = nil
// Use the single chunk if possible.
if p.currMeta.Chunk != nil {
if len(p.bufIter.Intervals) == 0 {
// If there is no overlap with deletion intervals and a single chunk is
// returned, we can take chunk as it is.
p.currDelIter = nil
return true
}
// Otherwise we need to iterate over the samples in the single chunk
// and create new chunks.
p.bufIter.Iter = p.currMeta.Chunk.Iterator(p.bufIter.Iter)
p.currDelIter = &p.bufIter
return true
}
// We don't want the full chunk, take just a part of it.
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(p.bufIter.Iter)
// Otherwise, use the iterable to create an iterator.
p.bufIter.Iter = iterable.Iterator(p.bufIter.Iter)
p.currDelIter = &p.bufIter
return true
}
@ -765,7 +782,7 @@ func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
p.curr = p.currChkMeta.Chunk.Iterator(p.curr)
p.curr = p.currMeta.Chunk.Iterator(p.curr)
}
if valueType := p.curr.Next(); valueType != chunkenc.ValNone {
return valueType
@ -817,22 +834,61 @@ func (p *populateWithDelSeriesIterator) Err() error {
type populateWithDelChunkSeriesIterator struct {
populateWithDelGenericSeriesIterator
curr chunks.Meta
// currMetaWithChunk is current meta with its chunk field set. This meta
// is guaranteed to map to a single chunk. This differs from
// populateWithDelGenericSeriesIterator.currMeta as that
// could refer to multiple chunks.
currMetaWithChunk chunks.Meta
// chunksFromIterable stores the chunks created from iterating through
// the iterable returned by cr.ChunkOrIterable() (with deleted samples
// removed).
chunksFromIterable []chunks.Meta
chunksFromIterableIdx int
}
func (p *populateWithDelChunkSeriesIterator) reset(blockID ulid.ULID, cr ChunkReader, chks []chunks.Meta, intervals tombstones.Intervals) {
p.populateWithDelGenericSeriesIterator.reset(blockID, cr, chks, intervals)
p.curr = chunks.Meta{}
p.currMetaWithChunk = chunks.Meta{}
p.chunksFromIterable = p.chunksFromIterable[:0]
p.chunksFromIterableIdx = -1
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if p.currMeta.Chunk == nil {
// If we've been creating chunks from the iterable, check if there are
// any more chunks to iterate through.
if p.chunksFromIterableIdx < len(p.chunksFromIterable)-1 {
p.chunksFromIterableIdx++
p.currMetaWithChunk = p.chunksFromIterable[p.chunksFromIterableIdx]
return true
}
}
// Move to the next chunk/deletion iterator.
if !p.next(true) {
return false
}
p.curr = p.currChkMeta
if p.currDelIter == nil {
return true
if p.currMeta.Chunk != nil {
if p.currDelIter == nil {
p.currMetaWithChunk = p.currMeta
return true
}
// If ChunkOrIterable() returned a non-nil chunk, the samples in
// p.currDelIter will only form one chunk, as the only change
// p.currDelIter might make is deleting some samples.
return p.populateCurrForSingleChunk()
}
// If ChunkOrIterable() returned an iterable, multiple chunks may be
// created from the samples in p.currDelIter.
return p.populateChunksFromIterable()
}
// populateCurrForSingleChunk sets the fields within p.currMetaWithChunk. This
// should be called if the samples in p.currDelIter only form one chunk.
func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
valueType := p.currDelIter.Next()
if valueType == chunkenc.ValNone {
if err := p.currDelIter.Err(); err != nil {
@ -840,9 +896,9 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
}
return false
}
p.curr.MinTime = p.currDelIter.AtT()
p.currMetaWithChunk.MinTime = p.currDelIter.AtT()
// Re-encode the chunk if iterator is provider. This means that it has
// Re-encode the chunk if iterator is provided. This means that it has
// some samples to be deleted or chunk is opened.
var (
newChunk chunkenc.Chunk
@ -900,7 +956,7 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
}
}
default:
err = fmt.Errorf("populateWithDelChunkSeriesIterator: value type %v unsupported", valueType)
err = fmt.Errorf("populateCurrForSingleChunk: value type %v unsupported", valueType)
}
if err != nil {
@ -912,12 +968,127 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
p.currMetaWithChunk.Chunk = newChunk
p.currMetaWithChunk.MaxTime = t
return true
}
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }
// populateChunksFromIterable reads the samples from currDelIter to create
// chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first
// chunk.
func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
p.chunksFromIterable = p.chunksFromIterable[:0]
p.chunksFromIterableIdx = -1
firstValueType := p.currDelIter.Next()
if firstValueType == chunkenc.ValNone {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "populateChunksFromIterable: no samples could be read")
return false
}
return false
}
var (
// t is the timestamp for the current sample.
t int64
cmint int64
cmaxt int64
currentChunk chunkenc.Chunk
app chunkenc.Appender
newChunk chunkenc.Chunk
recoded bool
err error
)
prevValueType := chunkenc.ValNone
for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() {
// Check if the encoding has changed (i.e. we need to create a new
// chunk as chunks can't have multiple encoding types).
// For the first sample, the following condition will always be true as
// ValNoneNone != ValFloat | ValHistogram | ValFloatHistogram.
if currentValueType != prevValueType {
if prevValueType != chunkenc.ValNone {
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
}
cmint = p.currDelIter.AtT()
if currentChunk, err = currentValueType.NewChunk(); err != nil {
break
}
if app, err = currentChunk.Appender(); err != nil {
break
}
}
switch currentValueType {
case chunkenc.ValFloat:
{
var v float64
t, v = p.currDelIter.At()
app.Append(t, v)
}
case chunkenc.ValHistogram:
{
var v *histogram.Histogram
t, v = p.currDelIter.AtHistogram()
// No need to set prevApp as AppendHistogram will set the
// counter reset header for the appender that's returned.
newChunk, recoded, app, err = app.AppendHistogram(nil, t, v, false)
}
case chunkenc.ValFloatHistogram:
{
var v *histogram.FloatHistogram
t, v = p.currDelIter.AtFloatHistogram()
// No need to set prevApp as AppendHistogram will set the
// counter reset header for the appender that's returned.
newChunk, recoded, app, err = app.AppendFloatHistogram(nil, t, v, false)
}
}
if err != nil {
break
}
if newChunk != nil {
if !recoded {
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
}
currentChunk = newChunk
cmint = t
}
cmaxt = t
prevValueType = currentValueType
}
if err != nil {
p.err = errors.Wrap(err, "populateChunksFromIterable: error when writing new chunks")
return false
}
if err = p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "populateChunksFromIterable: currDelIter error when writing new chunks")
return false
}
if prevValueType != chunkenc.ValNone {
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
}
if len(p.chunksFromIterable) == 0 {
return false
}
p.currMetaWithChunk = p.chunksFromIterable[0]
p.chunksFromIterableIdx = 0
return true
}
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.currMetaWithChunk }
// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Series with no samples.
@ -1117,8 +1288,8 @@ func newNopChunkReader() ChunkReader {
}
}
func (cr nopChunkReader) Chunk(chunks.Meta) (chunkenc.Chunk, error) {
return cr.emptyChunk, nil
func (cr nopChunkReader) ChunkOrIterable(chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
return cr.emptyChunk, nil, nil
}
func (cr nopChunkReader) Close() error { return nil }

View file

@ -685,12 +685,14 @@ func TestBlockQuerierDelete(t *testing.T) {
type fakeChunksReader struct {
ChunkReader
chks map[chunks.ChunkRef]chunkenc.Chunk
chks map[chunks.ChunkRef]chunkenc.Chunk
iterables map[chunks.ChunkRef]chunkenc.Iterable
}
func createFakeReaderAndNotPopulatedChunks(s ...[]chunks.Sample) (*fakeChunksReader, []chunks.Meta) {
f := &fakeChunksReader{
chks: map[chunks.ChunkRef]chunkenc.Chunk{},
chks: map[chunks.ChunkRef]chunkenc.Chunk{},
iterables: map[chunks.ChunkRef]chunkenc.Iterable{},
}
chks := make([]chunks.Meta, 0, len(s))
@ -707,21 +709,102 @@ func createFakeReaderAndNotPopulatedChunks(s ...[]chunks.Sample) (*fakeChunksRea
return f, chks
}
func (r *fakeChunksReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
chk, ok := r.chks[meta.Ref]
if !ok {
return nil, fmt.Errorf("chunk not found at ref %v", meta.Ref)
// Samples in each slice are assumed to be sorted.
func createFakeReaderAndIterables(s ...[]chunks.Sample) (*fakeChunksReader, []chunks.Meta) {
f := &fakeChunksReader{
chks: map[chunks.ChunkRef]chunkenc.Chunk{},
iterables: map[chunks.ChunkRef]chunkenc.Iterable{},
}
return chk, nil
chks := make([]chunks.Meta, 0, len(s))
for ref, samples := range s {
f.iterables[chunks.ChunkRef(ref)] = &mockIterable{s: samples}
var minTime, maxTime int64
if len(samples) > 0 {
minTime = samples[0].T()
maxTime = samples[len(samples)-1].T()
}
chks = append(chks, chunks.Meta{
Ref: chunks.ChunkRef(ref),
MinTime: minTime,
MaxTime: maxTime,
})
}
return f, chks
}
func (r *fakeChunksReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
if chk, ok := r.chks[meta.Ref]; ok {
return chk, nil, nil
}
if it, ok := r.iterables[meta.Ref]; ok {
return nil, it, nil
}
return nil, nil, fmt.Errorf("chunk or iterable not found at ref %v", meta.Ref)
}
type mockIterable struct {
s []chunks.Sample
}
func (it *mockIterable) Iterator(chunkenc.Iterator) chunkenc.Iterator {
return &mockSampleIterator{
s: it.s,
idx: -1,
}
}
type mockSampleIterator struct {
s []chunks.Sample
idx int
}
func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType {
for ; it.idx < len(it.s); it.idx++ {
if it.idx != -1 && it.s[it.idx].T() >= t {
return it.s[it.idx].Type()
}
}
return chunkenc.ValNone
}
func (it *mockSampleIterator) At() (int64, float64) {
return it.s[it.idx].T(), it.s[it.idx].F()
}
func (it *mockSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
return it.s[it.idx].T(), it.s[it.idx].H()
}
func (it *mockSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
return it.s[it.idx].T(), it.s[it.idx].FH()
}
func (it *mockSampleIterator) AtT() int64 {
return it.s[it.idx].T()
}
func (it *mockSampleIterator) Next() chunkenc.ValueType {
if it.idx < len(it.s)-1 {
it.idx++
return it.s[it.idx].Type()
}
return chunkenc.ValNone
}
func (it *mockSampleIterator) Err() error { return nil }
func TestPopulateWithTombSeriesIterators(t *testing.T) {
type minMaxTimes struct {
minTime, maxTime int64
}
cases := []struct {
name string
chks [][]chunks.Sample
name string
samples [][]chunks.Sample
expected []chunks.Sample
expectedChks []chunks.Meta
@ -732,23 +815,38 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
// Seek being zero means do not test seek.
seek int64
seekSuccess bool
// Set this to true if a sample slice will form multiple chunks.
skipChunkTest bool
skipIterableTest bool
}{
{
name: "no chunk",
chks: [][]chunks.Sample{},
name: "no chunk",
samples: [][]chunks.Sample{},
},
{
name: "one empty chunk", // This should never happen.
chks: [][]chunks.Sample{{}},
name: "one empty chunk", // This should never happen.
samples: [][]chunks.Sample{{}},
expectedChks: []chunks.Meta{
assureChunkFromSamples(t, []chunks.Sample{}),
},
expectedMinMaxTimes: []minMaxTimes{{0, 0}},
// iterables with no samples will return no chunks instead of empty chunks
skipIterableTest: true,
},
{
name: "three empty chunks", // This should never happen.
chks: [][]chunks.Sample{{}, {}, {}},
name: "one empty iterable",
samples: [][]chunks.Sample{{}},
// iterables with no samples will return no chunks
expectedChks: nil,
skipChunkTest: true,
},
{
name: "three empty chunks", // This should never happen.
samples: [][]chunks.Sample{{}, {}, {}},
expectedChks: []chunks.Meta{
assureChunkFromSamples(t, []chunks.Sample{}),
@ -756,10 +854,20 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
assureChunkFromSamples(t, []chunks.Sample{}),
},
expectedMinMaxTimes: []minMaxTimes{{0, 0}, {0, 0}, {0, 0}},
// iterables with no samples will return no chunks instead of empty chunks
skipIterableTest: true,
},
{
name: "three empty iterables",
samples: [][]chunks.Sample{{}, {}, {}},
// iterables with no samples will return no chunks
expectedChks: nil,
skipChunkTest: true,
},
{
name: "one chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
},
@ -775,7 +883,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "two full chunks",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -795,7 +903,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "three full chunks",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
{sample{10, 22, nil, nil}, sample{203, 3493, nil, nil}},
@ -819,15 +927,15 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
// Seek cases.
{
name: "three empty chunks and seek", // This should never happen.
chks: [][]chunks.Sample{{}, {}, {}},
seek: 1,
name: "three empty chunks and seek", // This should never happen.
samples: [][]chunks.Sample{{}, {}, {}},
seek: 1,
seekSuccess: false,
},
{
name: "two chunks and seek beyond chunks",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -837,7 +945,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "two chunks and seek on middle of first chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -850,7 +958,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "two chunks and seek before first chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -864,12 +972,12 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
// Deletion / Trim cases.
{
name: "no chunk with deletion interval",
chks: [][]chunks.Sample{},
samples: [][]chunks.Sample{},
intervals: tombstones.Intervals{{Mint: 20, Maxt: 21}},
},
{
name: "two chunks with trimmed first and last samples from edge chunks",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -890,7 +998,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "two chunks with trimmed middle sample of first chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -911,7 +1019,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "two chunks with deletion across two chunks",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -933,7 +1041,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
// Deletion with seek.
{
name: "two chunks with trimmed first and last samples from edge chunks, seek from middle of first chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
@ -945,9 +1053,20 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
sample{3, 5, nil, nil}, sample{6, 1, nil, nil}, sample{7, 89, nil, nil},
},
},
{
name: "one chunk where all samples are trimmed",
samples: [][]chunks.Sample{
{sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{sample{7, 89, nil, nil}, sample{9, 8, nil, nil}},
},
intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: 3}}.Add(tombstones.Interval{Mint: 4, Maxt: math.MaxInt64}),
expected: nil,
expectedChks: nil,
},
{
name: "one histogram chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, tsdbutil.GenerateTestHistogram(1), nil},
sample{2, 0, tsdbutil.GenerateTestHistogram(2), nil},
@ -973,7 +1092,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one histogram chunk intersect with earlier deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, tsdbutil.GenerateTestHistogram(1), nil},
sample{2, 0, tsdbutil.GenerateTestHistogram(2), nil},
@ -996,7 +1115,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one histogram chunk intersect with later deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, tsdbutil.GenerateTestHistogram(1), nil},
sample{2, 0, tsdbutil.GenerateTestHistogram(2), nil},
@ -1021,7 +1140,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one float histogram chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, nil, tsdbutil.GenerateTestFloatHistogram(1)},
sample{2, 0, nil, tsdbutil.GenerateTestFloatHistogram(2)},
@ -1047,7 +1166,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one float histogram chunk intersect with earlier deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, nil, tsdbutil.GenerateTestFloatHistogram(1)},
sample{2, 0, nil, tsdbutil.GenerateTestFloatHistogram(2)},
@ -1070,7 +1189,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one float histogram chunk intersect with later deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, nil, tsdbutil.GenerateTestFloatHistogram(1)},
sample{2, 0, nil, tsdbutil.GenerateTestFloatHistogram(2)},
@ -1095,7 +1214,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one gauge histogram chunk",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, tsdbutil.GenerateTestGaugeHistogram(1), nil},
sample{2, 0, tsdbutil.GenerateTestGaugeHistogram(2), nil},
@ -1121,7 +1240,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one gauge histogram chunk intersect with earlier deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, tsdbutil.GenerateTestGaugeHistogram(1), nil},
sample{2, 0, tsdbutil.GenerateTestGaugeHistogram(2), nil},
@ -1144,7 +1263,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one gauge histogram chunk intersect with later deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, tsdbutil.GenerateTestGaugeHistogram(1), nil},
sample{2, 0, tsdbutil.GenerateTestGaugeHistogram(2), nil},
@ -1169,7 +1288,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one gauge float histogram",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(1)},
sample{2, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(2)},
@ -1195,7 +1314,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one gauge float histogram chunk intersect with earlier deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(1)},
sample{2, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(2)},
@ -1218,7 +1337,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "one gauge float histogram chunk intersect with later deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{1, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(1)},
sample{2, 0, nil, tsdbutil.GenerateTestGaugeFloatHistogram(2)},
@ -1243,7 +1362,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "three full mixed chunks",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 5, nil, nil}, sample{6, 1, nil, nil}},
{
sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil},
@ -1275,7 +1394,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "three full mixed chunks in different order",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil},
sample{9, 0, tsdbutil.GenerateTestGaugeHistogram(8), nil},
@ -1307,7 +1426,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "three full mixed chunks in different order intersect with deletion interval",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil},
sample{9, 0, tsdbutil.GenerateTestGaugeHistogram(8), nil},
@ -1338,7 +1457,7 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
{
name: "three full mixed chunks overlapping",
chks: [][]chunks.Sample{
samples: [][]chunks.Sample{
{
sample{7, 0, tsdbutil.GenerateTestGaugeHistogram(89), nil},
sample{12, 0, tsdbutil.GenerateTestGaugeHistogram(8), nil},
@ -1368,11 +1487,237 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
},
expectedMinMaxTimes: []minMaxTimes{{7, 12}, {11, 16}, {10, 203}},
},
{
// This case won't actually happen until OOO native histograms is implemented.
// Issue: https://github.com/prometheus/prometheus/issues/11220.
name: "int histogram iterables with counter resets",
samples: [][]chunks.Sample{
{
sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil},
sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil},
// Counter reset should be detected when chunks are created from the iterable.
sample{12, 0, tsdbutil.GenerateTestHistogram(5), nil},
sample{15, 0, tsdbutil.GenerateTestHistogram(6), nil},
sample{16, 0, tsdbutil.GenerateTestHistogram(7), nil},
// Counter reset should be detected when chunks are created from the iterable.
sample{17, 0, tsdbutil.GenerateTestHistogram(5), nil},
},
{
sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil},
sample{19, 0, tsdbutil.GenerateTestHistogram(7), nil},
// Counter reset should be detected when chunks are created from the iterable.
sample{20, 0, tsdbutil.GenerateTestHistogram(5), nil},
sample{21, 0, tsdbutil.GenerateTestHistogram(6), nil},
},
},
expected: []chunks.Sample{
sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil},
sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil},
sample{12, 0, tsdbutil.GenerateTestHistogram(5), nil},
sample{15, 0, tsdbutil.GenerateTestHistogram(6), nil},
sample{16, 0, tsdbutil.GenerateTestHistogram(7), nil},
sample{17, 0, tsdbutil.GenerateTestHistogram(5), nil},
sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil},
sample{19, 0, tsdbutil.GenerateTestHistogram(7), nil},
sample{20, 0, tsdbutil.GenerateTestHistogram(5), nil},
sample{21, 0, tsdbutil.GenerateTestHistogram(6), nil},
},
expectedChks: []chunks.Meta{
assureChunkFromSamples(t, []chunks.Sample{
sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil},
sample{8, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(9)), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{12, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(5)), nil},
sample{15, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(6)), nil},
sample{16, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(7)), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{17, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(5)), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil},
sample{19, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(7)), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{20, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(5)), nil},
sample{21, 0, tsdbutil.SetHistogramNotCounterReset(tsdbutil.GenerateTestHistogram(6)), nil},
}),
},
expectedMinMaxTimes: []minMaxTimes{
{7, 8},
{12, 16},
{17, 17},
{18, 19},
{20, 21},
},
// Skipping chunk test - can't create a single chunk for each
// sample slice since there are counter resets in the middle of
// the slices.
skipChunkTest: true,
},
{
// This case won't actually happen until OOO native histograms is implemented.
// Issue: https://github.com/prometheus/prometheus/issues/11220.
name: "float histogram iterables with counter resets",
samples: [][]chunks.Sample{
{
sample{7, 0, nil, tsdbutil.GenerateTestFloatHistogram(8)},
sample{8, 0, nil, tsdbutil.GenerateTestFloatHistogram(9)},
// Counter reset should be detected when chunks are created from the iterable.
sample{12, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)},
sample{15, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)},
sample{16, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)},
// Counter reset should be detected when chunks are created from the iterable.
sample{17, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)},
},
{
sample{18, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)},
sample{19, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)},
// Counter reset should be detected when chunks are created from the iterable.
sample{20, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)},
sample{21, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)},
},
},
expected: []chunks.Sample{
sample{7, 0, nil, tsdbutil.GenerateTestFloatHistogram(8)},
sample{8, 0, nil, tsdbutil.GenerateTestFloatHistogram(9)},
sample{12, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)},
sample{15, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)},
sample{16, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)},
sample{17, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)},
sample{18, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)},
sample{19, 0, nil, tsdbutil.GenerateTestFloatHistogram(7)},
sample{20, 0, nil, tsdbutil.GenerateTestFloatHistogram(5)},
sample{21, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)},
},
expectedChks: []chunks.Meta{
assureChunkFromSamples(t, []chunks.Sample{
sample{7, 0, nil, tsdbutil.GenerateTestFloatHistogram(8)},
sample{8, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(9))},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{12, 0, nil, tsdbutil.SetFloatHistogramCounterReset(tsdbutil.GenerateTestFloatHistogram(5))},
sample{15, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(6))},
sample{16, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(7))},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{17, 0, nil, tsdbutil.SetFloatHistogramCounterReset(tsdbutil.GenerateTestFloatHistogram(5))},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{18, 0, nil, tsdbutil.GenerateTestFloatHistogram(6)},
sample{19, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(7))},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{20, 0, nil, tsdbutil.SetFloatHistogramCounterReset(tsdbutil.GenerateTestFloatHistogram(5))},
sample{21, 0, nil, tsdbutil.SetFloatHistogramNotCounterReset(tsdbutil.GenerateTestFloatHistogram(6))},
}),
},
expectedMinMaxTimes: []minMaxTimes{
{7, 8},
{12, 16},
{17, 17},
{18, 19},
{20, 21},
},
// Skipping chunk test - can't create a single chunk for each
// sample slice since there are counter resets in the middle of
// the slices.
skipChunkTest: true,
},
{
// This case won't actually happen until OOO native histograms is implemented.
// Issue: https://github.com/prometheus/prometheus/issues/11220.
name: "iterables with mixed encodings and counter resets",
samples: [][]chunks.Sample{
{
sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil},
sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil},
sample{9, 0, nil, tsdbutil.GenerateTestFloatHistogram(10)},
sample{10, 0, nil, tsdbutil.GenerateTestFloatHistogram(11)},
sample{11, 0, nil, tsdbutil.GenerateTestFloatHistogram(12)},
sample{12, 13, nil, nil},
sample{13, 14, nil, nil},
sample{14, 0, tsdbutil.GenerateTestHistogram(8), nil},
// Counter reset should be detected when chunks are created from the iterable.
sample{15, 0, tsdbutil.GenerateTestHistogram(7), nil},
},
{
sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil},
sample{19, 45, nil, nil},
},
},
expected: []chunks.Sample{
sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil},
sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil},
sample{9, 0, nil, tsdbutil.GenerateTestFloatHistogram(10)},
sample{10, 0, nil, tsdbutil.GenerateTestFloatHistogram(11)},
sample{11, 0, nil, tsdbutil.GenerateTestFloatHistogram(12)},
sample{12, 13, nil, nil},
sample{13, 14, nil, nil},
sample{14, 0, tsdbutil.GenerateTestHistogram(8), nil},
sample{15, 0, tsdbutil.GenerateTestHistogram(7), nil},
sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil},
sample{19, 45, nil, nil},
},
expectedChks: []chunks.Meta{
assureChunkFromSamples(t, []chunks.Sample{
sample{7, 0, tsdbutil.GenerateTestHistogram(8), nil},
sample{8, 0, tsdbutil.GenerateTestHistogram(9), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{9, 0, nil, tsdbutil.GenerateTestFloatHistogram(10)},
sample{10, 0, nil, tsdbutil.GenerateTestFloatHistogram(11)},
sample{11, 0, nil, tsdbutil.GenerateTestFloatHistogram(12)},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{12, 13, nil, nil},
sample{13, 14, nil, nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{14, 0, tsdbutil.GenerateTestHistogram(8), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{15, 0, tsdbutil.SetHistogramCounterReset(tsdbutil.GenerateTestHistogram(7)), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{18, 0, tsdbutil.GenerateTestHistogram(6), nil},
}),
assureChunkFromSamples(t, []chunks.Sample{
sample{19, 45, nil, nil},
}),
},
expectedMinMaxTimes: []minMaxTimes{
{7, 8},
{9, 11},
{12, 13},
{14, 14},
{15, 15},
{18, 18},
{19, 19},
},
skipChunkTest: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Run("sample", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
var f *fakeChunksReader
var chkMetas []chunks.Meta
// If the test case wants to skip the chunks test, it probably
// means you can't create valid chunks from sample slices,
// therefore create iterables over the samples instead.
if tc.skipChunkTest {
f, chkMetas = createFakeReaderAndIterables(tc.samples...)
} else {
f, chkMetas = createFakeReaderAndNotPopulatedChunks(tc.samples...)
}
it := &populateWithDelSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tc.intervals)
@ -1393,7 +1738,35 @@ func TestPopulateWithTombSeriesIterators(t *testing.T) {
require.Equal(t, tc.expected, r)
})
t.Run("chunk", func(t *testing.T) {
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.chks...)
if tc.skipChunkTest {
t.Skip()
}
f, chkMetas := createFakeReaderAndNotPopulatedChunks(tc.samples...)
it := &populateWithDelChunkSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tc.intervals)
if tc.seek != 0 {
// Chunk iterator does not have Seek method.
return
}
expandedResult, err := storage.ExpandChunks(it)
require.NoError(t, err)
// We don't care about ref IDs for comparison, only chunk's samples matters.
rmChunkRefs(expandedResult)
rmChunkRefs(tc.expectedChks)
require.Equal(t, tc.expectedChks, expandedResult)
for i, meta := range expandedResult {
require.Equal(t, tc.expectedMinMaxTimes[i].minTime, meta.MinTime)
require.Equal(t, tc.expectedMinMaxTimes[i].maxTime, meta.MaxTime)
}
})
t.Run("iterables", func(t *testing.T) {
if tc.skipIterableTest {
t.Skip()
}
f, chkMetas := createFakeReaderAndIterables(tc.samples...)
it := &populateWithDelChunkSeriesIterator{}
it.reset(ulid.ULID{}, f, chkMetas, tc.intervals)
@ -1686,13 +2059,13 @@ func BenchmarkMergedSeriesSet(b *testing.B) {
type mockChunkReader map[chunks.ChunkRef]chunkenc.Chunk
func (cr mockChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
func (cr mockChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
chk, ok := cr[meta.Ref]
if ok {
return chk, nil
return chk, nil, nil
}
return nil, errors.New("Chunk with ref not found")
return nil, nil, errors.New("Chunk with ref not found")
}
func (cr mockChunkReader) Close() error {
@ -3020,7 +3393,7 @@ func TestBlockBaseSeriesSet(t *testing.T) {
idx := tc.expIdxs[i]
require.Equal(t, tc.series[idx].lset, bcs.curr.labels)
require.Equal(t, tc.series[idx].chunks, si.chks)
require.Equal(t, tc.series[idx].chunks, si.metas)
i++
}

View file

@ -116,7 +116,17 @@ func SetHistogramNotCounterReset(h *histogram.Histogram) *histogram.Histogram {
return h
}
func SetHistogramCounterReset(h *histogram.Histogram) *histogram.Histogram {
h.CounterResetHint = histogram.CounterReset
return h
}
func SetFloatHistogramNotCounterReset(h *histogram.FloatHistogram) *histogram.FloatHistogram {
h.CounterResetHint = histogram.NotCounterReset
return h
}
func SetFloatHistogramCounterReset(h *histogram.FloatHistogram) *histogram.FloatHistogram {
h.CounterResetHint = histogram.CounterReset
return h
}