diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index e7df0eeed..ec0f6d403 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -133,15 +133,6 @@ type Meta struct { // Time range the data covers. // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. MinTime, MaxTime int64 - - // OOOLastRef, OOOLastMinTime and OOOLastMaxTime are kept as markers for - // overlapping chunks. - // These fields point to the last created out of order Chunk (the head) that existed - // when Series() was called and was overlapping. - // Series() and Chunk() method responses should be consistent for the same - // query even if new data is added in between the calls. - OOOLastRef ChunkRef - OOOLastMinTime, OOOLastMaxTime int64 } // ChunkFromSamples requires all samples to have the same type. diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 689972f1b..b47e24f9e 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -487,55 +487,24 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe // We create a temporary slice of chunk metas to hold the information of all // possible chunks that may overlap with the requested chunk. - tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)) - - oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) - if s.ooo.oooHeadChunk != nil && s.ooo.oooHeadChunk.OverlapsClosedInterval(mint, maxt) { - // We only want to append the head chunk if this chunk existed when - // Series() was called. This brings consistency in case new data - // is added in between Series() and Chunk() calls. - if oooHeadRef == meta.OOOLastRef { - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ - meta: chunks.Meta{ - // Ignoring samples added before and after the last known min and max time for this chunk. - MinTime: meta.OOOLastMinTime, - MaxTime: meta.OOOLastMaxTime, - Ref: oooHeadRef, - }, - }) - } - } + tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)+1) for i, c := range s.ooo.oooMmappedChunks { - chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) - // We can skip chunks that came in later than the last known OOOLastRef. - if chunkRef > meta.OOOLastRef { - break - } - - switch { - case chunkRef == meta.OOOLastRef: - tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ - meta: chunks.Meta{ - MinTime: meta.OOOLastMinTime, - MaxTime: meta.OOOLastMaxTime, - Ref: chunkRef, - }, - ref: c.ref, - origMinT: c.minTime, - origMaxT: c.maxTime, - }) - case c.OverlapsClosedInterval(mint, maxt): + if c.OverlapsClosedInterval(mint, maxt) { tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ meta: chunks.Meta{ MinTime: c.minTime, MaxTime: c.maxTime, - Ref: chunkRef, + Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))), }, ref: c.ref, }) } } + // Add in data copied from the head OOO chunk. + if meta.Chunk != nil { + tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{meta: meta}) + } // Next we want to sort all the collected chunks by min time so we can find // those that overlap and stop when we know the rest don't. @@ -548,22 +517,8 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe continue } var iterable chunkenc.Iterable - if c.meta.Ref == oooHeadRef { - var xor *chunkenc.XORChunk - var err error - // If head chunk min and max time match the meta OOO markers - // that means that the chunk has not expanded so we can append - // it as it is. - if s.ooo.oooHeadChunk.minTime == meta.OOOLastMinTime && s.ooo.oooHeadChunk.maxTime == meta.OOOLastMaxTime { - xor, err = s.ooo.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called. - } else { - // We need to remove samples that are outside of the markers - xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime) - } - if err != nil { - return nil, fmt.Errorf("failed to convert ooo head chunk to xor chunk: %w", err) - } - iterable = xor + if c.meta.Chunk != nil { + iterable = c.meta.Chunk } else { chk, err := cdm.Chunk(c.ref) if err != nil { @@ -573,16 +528,7 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe } return nil, err } - if c.meta.Ref == meta.OOOLastRef && - (c.origMinT != meta.OOOLastMinTime || c.origMaxT != meta.OOOLastMaxTime) { - // The head expanded and was memory mapped so now we need to - // wrap the chunk within a chunk that doesnt allows us to iterate - // through samples out of the OOOLastMinT and OOOLastMaxT - // markers. - iterable = boundedIterable{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime} - } else { - iterable = chk - } + iterable = chk } mc.chunkIterables = append(mc.chunkIterables, iterable) if c.meta.MaxTime > absoluteMax { @@ -593,74 +539,6 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe return mc, nil } -var _ chunkenc.Iterable = &boundedIterable{} - -// boundedIterable is an implementation of chunkenc.Iterable that uses a -// boundedIterator that only iterates through samples which timestamps are -// >= minT and <= maxT. -type boundedIterable struct { - chunk chunkenc.Chunk - minT int64 - maxT int64 -} - -func (b boundedIterable) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { - it := b.chunk.Iterator(iterator) - if it == nil { - panic("iterator shouldn't be nil") - } - return boundedIterator{it, b.minT, b.maxT} -} - -var _ chunkenc.Iterator = &boundedIterator{} - -// boundedIterator is an implementation of Iterator that only iterates through -// samples which timestamps are >= minT and <= maxT. -type boundedIterator struct { - chunkenc.Iterator - minT int64 - maxT int64 -} - -// Next the first time its called it will advance as many positions as necessary -// until its able to find a sample within the bounds minT and maxT. -// If there are samples within bounds it will advance one by one amongst them. -// If there are no samples within bounds it will return false. -func (b boundedIterator) Next() chunkenc.ValueType { - for b.Iterator.Next() == chunkenc.ValFloat { - t, _ := b.Iterator.At() - switch { - case t < b.minT: - continue - case t > b.maxT: - return chunkenc.ValNone - default: - return chunkenc.ValFloat - } - } - return chunkenc.ValNone -} - -func (b boundedIterator) Seek(t int64) chunkenc.ValueType { - if t < b.minT { - // We must seek at least up to b.minT if it is asked for something before that. - val := b.Iterator.Seek(b.minT) - if !(val == chunkenc.ValFloat) { - return chunkenc.ValNone - } - t, _ := b.Iterator.At() - if t <= b.maxT { - return chunkenc.ValFloat - } - } - if t > b.maxT { - // We seek anyway so that the subsequent Next() calls will also return false. - b.Iterator.Seek(t) - return chunkenc.ValNone - } - return b.Iterator.Seek(t) -} - // safeHeadChunk makes sure that the chunk can be accessed without a race condition. type safeHeadChunk struct { chunkenc.Chunk diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index 8d835e943..6dd4c0ff5 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -15,7 +15,6 @@ package tsdb import ( "context" - "fmt" "sync" "testing" @@ -26,150 +25,6 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" ) -func TestBoundedChunk(t *testing.T) { - tests := []struct { - name string - inputChunk chunkenc.Chunk - inputMinT int64 - inputMaxT int64 - initialSeek int64 - seekIsASuccess bool - expSamples []sample - }{ - { - name: "if there are no samples it returns nothing", - inputChunk: newTestChunk(0), - expSamples: nil, - }, - { - name: "bounds represent a single sample", - inputChunk: newTestChunk(10), - expSamples: []sample{ - {0, 0, nil, nil}, - }, - }, - { - name: "if there are bounds set only samples within them are returned", - inputChunk: newTestChunk(10), - inputMinT: 1, - inputMaxT: 8, - expSamples: []sample{ - {1, 1, nil, nil}, - {2, 2, nil, nil}, - {3, 3, nil, nil}, - {4, 4, nil, nil}, - {5, 5, nil, nil}, - {6, 6, nil, nil}, - {7, 7, nil, nil}, - {8, 8, nil, nil}, - }, - }, - { - name: "if bounds set and only maxt is less than actual maxt", - inputChunk: newTestChunk(10), - inputMinT: 0, - inputMaxT: 5, - expSamples: []sample{ - {0, 0, nil, nil}, - {1, 1, nil, nil}, - {2, 2, nil, nil}, - {3, 3, nil, nil}, - {4, 4, nil, nil}, - {5, 5, nil, nil}, - }, - }, - { - name: "if bounds set and only mint is more than actual mint", - inputChunk: newTestChunk(10), - inputMinT: 5, - inputMaxT: 9, - expSamples: []sample{ - {5, 5, nil, nil}, - {6, 6, nil, nil}, - {7, 7, nil, nil}, - {8, 8, nil, nil}, - {9, 9, nil, nil}, - }, - }, - { - name: "if there are bounds set with seek before mint", - inputChunk: newTestChunk(10), - inputMinT: 3, - inputMaxT: 7, - initialSeek: 1, - seekIsASuccess: true, - expSamples: []sample{ - {3, 3, nil, nil}, - {4, 4, nil, nil}, - {5, 5, nil, nil}, - {6, 6, nil, nil}, - {7, 7, nil, nil}, - }, - }, - { - name: "if there are bounds set with seek between mint and maxt", - inputChunk: newTestChunk(10), - inputMinT: 3, - inputMaxT: 7, - initialSeek: 5, - seekIsASuccess: true, - expSamples: []sample{ - {5, 5, nil, nil}, - {6, 6, nil, nil}, - {7, 7, nil, nil}, - }, - }, - { - name: "if there are bounds set with seek after maxt", - inputChunk: newTestChunk(10), - inputMinT: 3, - inputMaxT: 7, - initialSeek: 8, - seekIsASuccess: false, - }, - } - for _, tc := range tests { - t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { - iterable := boundedIterable{tc.inputChunk, tc.inputMinT, tc.inputMaxT} - - var samples []sample - it := iterable.Iterator(nil) - - if tc.initialSeek != 0 { - // Testing Seek() - val := it.Seek(tc.initialSeek) - require.Equal(t, tc.seekIsASuccess, val == chunkenc.ValFloat) - if val == chunkenc.ValFloat { - t, v := it.At() - samples = append(samples, sample{t, v, nil, nil}) - } - } - - // Testing Next() - for it.Next() == chunkenc.ValFloat { - t, v := it.At() - samples = append(samples, sample{t, v, nil, nil}) - } - - // it.Next() should keep returning no value. - for i := 0; i < 10; i++ { - require.Equal(t, chunkenc.ValNone, it.Next()) - } - - require.Equal(t, tc.expSamples, samples) - }) - } -} - -func newTestChunk(numSamples int) chunkenc.Chunk { - xor := chunkenc.NewXORChunk() - a, _ := xor.Appender() - for i := 0; i < numSamples; i++ { - a.Append(int64(i), float64(i)) - } - return xor -} - // TestMemSeries_chunk runs a series of tests on memSeries.chunk() calls. // It will simulate various conditions to ensure all code paths in that function are covered. func TestMemSeries_chunk(t *testing.T) { diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 3b5adf80c..47972c3cc 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -94,48 +94,32 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) - // We define these markers to track the last chunk reference while we - // fill the chunk meta. - // These markers are useful to give consistent responses to repeated queries - // even if new chunks that might be overlapping or not are added afterwards. - // Also, lastMinT and lastMaxT are initialized to the max int as a sentinel - // value to know they are unset. - var lastChunkRef chunks.ChunkRef - lastMinT, lastMaxT := int64(math.MaxInt64), int64(math.MaxInt64) - - addChunk := func(minT, maxT int64, ref chunks.ChunkRef) { - // the first time we get called is for the last included chunk. - // set the markers accordingly - if lastMinT == int64(math.MaxInt64) { - lastChunkRef = ref - lastMinT = minT - lastMaxT = maxT - } - + addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { tmpChks = append(tmpChks, chunks.Meta{ - MinTime: minT, - MaxTime: maxT, - Ref: ref, - OOOLastRef: lastChunkRef, - OOOLastMinTime: lastMinT, - OOOLastMaxTime: lastMaxT, + MinTime: minT, + MaxTime: maxT, + Ref: ref, + Chunk: chunk, }) } - // Collect all chunks that overlap the query range, in order from most recent to most old, - // so we can set the correct markers. + // Collect all chunks that overlap the query range. if s.ooo.oooHeadChunk != nil { c := s.ooo.oooHeadChunk if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) - addChunk(c.minTime, c.maxTime, ref) + var xor chunkenc.Chunk + if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. + xor, _ = c.chunk.ToXOR() // Ignoring error because it can't fail. + } + addChunk(c.minTime, c.maxTime, ref, xor) } } for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { c := s.ooo.oooMmappedChunks[i] if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (maxMmapRef == 0 || maxMmapRef.GreaterThanOrEqualTo(c.ref)) && (lastGarbageCollectedMmapRef == 0 || c.ref.GreaterThan(lastGarbageCollectedMmapRef)) { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) - addChunk(c.minTime, c.maxTime, ref) + addChunk(c.minTime, c.maxTime, ref, nil) } } @@ -163,6 +147,12 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra case c.MaxTime > maxTime: maxTime = c.MaxTime (*chks)[len(*chks)-1].MaxTime = c.MaxTime + fallthrough + default: + // If the head OOO chunk is part of an output chunk, copy the chunk pointer. + if c.Chunk != nil { + (*chks)[len(*chks)-1].Chunk = c.Chunk + } } } @@ -185,10 +175,8 @@ func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matc } type chunkMetaAndChunkDiskMapperRef struct { - meta chunks.Meta - ref chunks.ChunkDiskMapperRef - origMinT int64 - origMaxT int64 + meta chunks.Meta + ref chunks.ChunkDiskMapperRef } func refLessByMinTimeAndMinRef(a, b chunkMetaAndChunkDiskMapperRef) int { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 1716f29b5..ce1fff100 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -304,18 +304,6 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { s1, _, _ := h.getOrCreate(s1ID, s1Lset) s1.ooo = &memSeriesOOOFields{} - var lastChunk chunkInterval - var lastChunkPos int - - // the marker should be set based on whichever is the last chunk/interval that overlaps with the query range - for i, interv := range intervals { - if overlapsClosedInterval(interv.mint, interv.maxt, tc.queryMinT, tc.queryMaxT) { - lastChunk = interv - lastChunkPos = i - } - } - lastChunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(1, chunks.HeadChunkID(uint64(lastChunkPos)))) - // define our expected chunks, by looking at the expected ChunkIntervals and setting... var expChunks []chunks.Meta for _, e := range tc.expChunks { @@ -323,10 +311,6 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { Chunk: chunkenc.Chunk(nil), MinTime: e.mint, MaxTime: e.maxt, - // markers based on the last chunk we found above - OOOLastMinTime: lastChunk.mint, - OOOLastMaxTime: lastChunk.maxt, - OOOLastRef: lastChunkRef, } // Ref to whatever Ref the chunk has, that we refer to by ID @@ -343,6 +327,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { if headChunk && len(intervals) > 0 { // Put the last interval in the head chunk s1.ooo.oooHeadChunk = &oooHeadChunk{ + chunk: NewOOOChunk(), minTime: intervals[len(intervals)-1].mint, maxTime: intervals[len(intervals)-1].maxt, } @@ -842,8 +827,8 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) { } require.NoError(t, app.Commit()) - // The Series method is the one that populates the chunk meta OOO - // markers like OOOLastRef. These are then used by the ChunkReader. + // The Series method populates the chunk metas, taking a copy of the + // head OOO chunk if necessary. These are then used by the ChunkReader. ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder @@ -939,7 +924,6 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( sample{t: minutes(25), f: float64(1)}, sample{t: minutes(26), f: float64(0)}, sample{t: minutes(30), f: float64(0)}, - sample{t: minutes(32), f: float64(1)}, // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept sample{t: minutes(35), f: float64(1)}, }, }, @@ -985,7 +969,6 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( sample{t: minutes(25), f: float64(1)}, sample{t: minutes(26), f: float64(0)}, sample{t: minutes(30), f: float64(0)}, - sample{t: minutes(32), f: float64(1)}, // This sample was added after Series() but before Chunk() and its in between the lastmint and maxt so it should be kept sample{t: minutes(35), f: float64(1)}, }, }, @@ -1007,8 +990,8 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding( } require.NoError(t, app.Commit()) - // The Series method is the one that populates the chunk meta OOO - // markers like OOOLastRef. These are then used by the ChunkReader. + // The Series method populates the chunk metas, taking a copy of the + // head OOO chunk if necessary. These are then used by the ChunkReader. ir := NewOOOHeadIndexReader(db.head, tc.queryMinT, tc.queryMaxT, 0) var chks []chunks.Meta var b labels.ScratchBuilder