Add support for handling multiple chunks in OOO head

Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
Co-authored by: Jeanette Tan <jeanette.tan@grafana.com>:
Co-authored by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>:
Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
Co-authored by: Fiona Liao <fiona.liao@grafana.com>:
This commit is contained in:
Carrie Edwards 2024-07-08 09:48:27 -07:00
parent ba948e94fb
commit 2e0e4e9ce9
4 changed files with 162 additions and 66 deletions

View file

@ -846,16 +846,17 @@ func (a *headAppender) Commit() (err error) {
// number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
floatOOBRejected int
inOrderMint int64 = math.MaxInt64
inOrderMaxt int64 = math.MinInt64
ooomint int64 = math.MaxInt64
ooomaxt int64 = math.MinInt64
wblSamples []record.RefSample
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef
oooRecords [][]byte
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
series *memSeries
appendChunkOpts = chunkOpts{
inOrderMint int64 = math.MaxInt64
inOrderMaxt int64 = math.MinInt64
ooomint int64 = math.MaxInt64
ooomaxt int64 = math.MinInt64
wblSamples []record.RefSample
oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
oooMmapMarkersCount int
oooRecords [][]byte
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
series *memSeries
appendChunkOpts = chunkOpts{
chunkDiskMapper: a.head.chunkDiskMapper,
chunkRange: a.head.chunkRange.Load(),
samplesPerChunk: a.head.opts.SamplesPerChunk,
@ -872,6 +873,7 @@ func (a *headAppender) Commit() (err error) {
// WBL is not enabled. So no need to collect.
wblSamples = nil
oooMmapMarkers = nil
oooMmapMarkersCount = 0
return
}
// The m-map happens before adding a new sample. So we collect
@ -880,12 +882,14 @@ func (a *headAppender) Commit() (err error) {
// WBL Before this Commit(): [old samples before this commit for chunk 1]
// WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
if oooMmapMarkers != nil {
markers := make([]record.RefMmapMarker, 0, len(oooMmapMarkers))
for ref, mmapRef := range oooMmapMarkers {
markers = append(markers, record.RefMmapMarker{
Ref: ref,
MmapRef: mmapRef,
})
markers := make([]record.RefMmapMarker, 0, oooMmapMarkersCount)
for ref, mmapRefs := range oooMmapMarkers {
for _, mmapRef := range mmapRefs {
markers = append(markers, record.RefMmapMarker{
Ref: ref,
MmapRef: mmapRef,
})
}
}
r := enc.MmapMarkers(markers, a.head.getBytesBuffer())
oooRecords = append(oooRecords, r)
@ -928,11 +932,11 @@ func (a *headAppender) Commit() (err error) {
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRef chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 {
if !ok || r != nil {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.
@ -943,9 +947,16 @@ func (a *headAppender) Commit() (err error) {
}
if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
}
if len(mmapRefs) > 0 {
oooMmapMarkers[series.ref] = mmapRefs
oooMmapMarkersCount += len(mmapRefs)
} else {
// No chunk was written to disk, so we need to set an initial marker for this series.
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
oooMmapMarkersCount++
}
oooMmapMarkers[series.ref] = mmapRef
}
if ok {
wblSamples = append(wblSamples, s)
@ -1069,14 +1080,14 @@ func (a *headAppender) Commit() (err error) {
}
// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
if s.ooo == nil {
s.ooo = &memSeriesOOOFields{}
}
c := s.ooo.oooHeadChunk
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
chunkCreated = true
}
@ -1089,7 +1100,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
c.maxTime = t
}
}
return ok, chunkCreated, mmapRef
return ok, chunkCreated, mmapRefs
}
// chunkOpts are chunk-level options that are passed when appending to a memSeries.
@ -1431,7 +1442,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) {
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
s.ooo.oooHeadChunk = &oooHeadChunk{
@ -1443,21 +1454,29 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
return s.ooo.oooHeadChunk, ref
}
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef {
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef {
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
// There is no head chunk, so nothing to m-map here.
return 0
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
return nil
}
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
if err != nil {
handleChunkWriteError(err)
return nil
}
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1)
for _, memchunk := range chks {
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
chunkRefs = append(chunkRefs, chunkRef)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(memchunk.chunk.NumSamples()),
minTime: memchunk.minTime,
maxTime: memchunk.maxTime,
})
}
xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality.
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, xor, true, handleChunkWriteError)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(xor.NumSamples()),
minTime: s.ooo.oooHeadChunk.minTime,
maxTime: s.ooo.oooHeadChunk.maxTime,
})
s.ooo.oooHeadChunk = nil
return chunkRef
return chunkRefs
}
// mmapChunks will m-map all but first chunk on s.headChunks list.

View file

@ -4792,9 +4792,11 @@ func TestWBLReplay(t *testing.T) {
require.False(t, ok)
require.NotNil(t, ms)
xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR()
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
require.Len(t, chks, 1)
xor := chks[0].chunk.(*chunkenc.XORChunk)
it := xor.Iterator(nil)
actOOOSamples := make([]sample, 0, len(expOOOSamples))
for it.Next() == chunkenc.ValFloat {

View file

@ -15,11 +15,11 @@ package tsdb
import (
"fmt"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"sort"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
@ -74,24 +74,22 @@ func (o *OOOChunk) NumSamples() int {
return len(o.samples)
}
func (o *OOOChunk) ToXOR() (*chunkenc.XORChunk, error) {
x := chunkenc.NewXORChunk()
app, err := x.Appender()
if err != nil {
return nil, err
}
for _, s := range o.samples {
app.Append(s.t, s.f)
}
return x, nil
}
func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, error) {
x := chunkenc.NewXORChunk()
app, err := x.Appender()
if err != nil {
return nil, err
// ToEncodedChunks returns chunks with the samples in the OOOChunk.
//
//nolint:revive // unexported-return.
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) {
if len(o.samples) == 0 {
return nil, nil
}
// The most common case is that there will be a single chunk, with the same type of samples in it - this is always true for float samples.
chks = make([]memChunk, 0, 1)
var (
cmint int64
cmaxt int64
chunk chunkenc.Chunk
app chunkenc.Appender
)
prevEncoding := chunkenc.EncNone // Yes we could call the chunk for this, but this is more efficient.
for _, s := range o.samples {
if s.t < mint {
continue
@ -99,9 +97,77 @@ func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk,
if s.t > maxt {
break
}
app.Append(s.t, s.f)
encoding := chunkenc.EncXOR
if s.h != nil {
encoding = chunkenc.EncHistogram
} else if s.fh != nil {
encoding = chunkenc.EncFloatHistogram
}
// prevApp is the appender for the previous sample.
prevApp := app
if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
cmint = s.t
switch encoding {
case chunkenc.EncXOR:
chunk = chunkenc.NewXORChunk()
case chunkenc.EncHistogram:
chunk = chunkenc.NewHistogramChunk()
case chunkenc.EncFloatHistogram:
chunk = chunkenc.NewFloatHistogramChunk()
default:
chunk = chunkenc.NewXORChunk()
}
app, err = chunk.Appender()
if err != nil {
return
}
}
switch encoding {
case chunkenc.EncXOR:
app.Append(s.t, s.f)
case chunkenc.EncHistogram:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
)
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
case chunkenc.EncFloatHistogram:
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
)
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
}
cmaxt = s.t
prevEncoding = encoding
}
return x, nil
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
return chks, nil
}
var _ BlockReader = &OOORangeHead{}

View file

@ -108,11 +108,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
c := s.ooo.oooHeadChunk
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
var xor chunkenc.Chunk
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
xor, _ = c.chunk.ToXOR() // Ignoring error because it can't fail.
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
if err != nil {
handleChunkWriteError(err)
return nil
}
for _, chk := range chks {
addChunk(c.minTime, c.maxTime, ref, chk.chunk)
}
} else {
var enc chunkenc.Chunk
addChunk(c.minTime, c.maxTime, ref, enc)
}
addChunk(c.minTime, c.maxTime, ref, xor)
}
}
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
@ -341,14 +349,15 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
continue
}
mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
}
seq, off := mmapRef.Unpack()
lastMmapRef := mmapRefs[len(mmapRefs)-1]
seq, off := lastMmapRef.Unpack()
if seq > lastSeq || (seq == lastSeq && off > lastOff) {
ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off
ch.lastMmapRef, lastSeq, lastOff = lastMmapRef, seq, off
}
if len(ms.ooo.oooMmappedChunks) > 0 {
ch.postings = append(ch.postings, seriesRef)