Merge pull request #11779 from codesome/memseries-ooo

tsdb: Only initialise out-of-order fields when required
This commit is contained in:
Ganesh Vernekar 2023-01-16 10:58:05 +05:30 committed by GitHub
commit cb2be6e62f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 76 deletions

View file

@ -4095,8 +4095,7 @@ func TestOOOCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.oooHeadChunk) require.Nil(t, ms.ooo)
require.Equal(t, 0, len(ms.oooMmappedChunks))
} }
checkEmptyOOOChunk(series1) checkEmptyOOOChunk(series1)
checkEmptyOOOChunk(series2) checkEmptyOOOChunk(series2)
@ -4138,8 +4137,8 @@ func TestOOOCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Greater(t, ms.oooHeadChunk.chunk.NumSamples(), 0) require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0)
require.Equal(t, 14, len(ms.oooMmappedChunks)) // 7 original, 7 duplicate. require.Equal(t, 14, len(ms.ooo.oooMmappedChunks)) // 7 original, 7 duplicate.
} }
checkNonEmptyOOOChunk(series1) checkNonEmptyOOOChunk(series1)
checkNonEmptyOOOChunk(series2) checkNonEmptyOOOChunk(series2)
@ -4278,7 +4277,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Greater(t, ms.oooHeadChunk.chunk.NumSamples(), 0) require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0)
} }
// If the normal Head is not compacted, the OOO head compaction does not take place. // If the normal Head is not compacted, the OOO head compaction does not take place.
@ -4306,8 +4305,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.oooHeadChunk) require.Nil(t, ms.ooo)
require.Equal(t, 0, len(ms.oooMmappedChunks))
} }
verifySamples := func(block *Block, fromMins, toMins int64) { verifySamples := func(block *Block, fromMins, toMins int64) {
@ -4700,8 +4698,7 @@ func TestOOODisabled(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.NotNil(t, ms) require.NotNil(t, ms)
require.Nil(t, ms.oooHeadChunk) require.Nil(t, ms.ooo)
require.Len(t, ms.oooMmappedChunks, 0)
} }
func TestWBLAndMmapReplay(t *testing.T) { func TestWBLAndMmapReplay(t *testing.T) {
@ -4765,7 +4762,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
require.False(t, created) require.False(t, created)
require.NoError(t, err) require.NoError(t, err)
var s1MmapSamples []tsdbutil.Sample var s1MmapSamples []tsdbutil.Sample
for _, mc := range ms.oooMmappedChunks { for _, mc := range ms.ooo.oooMmappedChunks {
chk, err := db.head.chunkDiskMapper.Chunk(mc.ref) chk, err := db.head.chunkDiskMapper.Chunk(mc.ref)
require.NoError(t, err) require.NoError(t, err)
it := chk.Iterator(nil) it := chk.Iterator(nil)
@ -4972,8 +4969,7 @@ func TestOOOCompactionFailure(t *testing.T) {
ms, created, err := db.head.getOrCreate(series1.Hash(), series1) ms, created, err := db.head.getOrCreate(series1.Hash(), series1)
require.NoError(t, err) require.NoError(t, err)
require.False(t, created) require.False(t, created)
require.Nil(t, ms.oooHeadChunk) require.Nil(t, ms.ooo)
require.Len(t, ms.oooMmappedChunks, 0)
// The failed compaction should not have left the ooo Head corrupted. // The failed compaction should not have left the ooo Head corrupted.
// Hence, expect no new blocks with another OOO compaction call. // Hence, expect no new blocks with another OOO compaction call.
@ -5778,7 +5774,7 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
db.DisableCompactions() db.DisableCompactions()
ms := db.head.series.getByHash(series1.Hash(), series1) ms := db.head.series.getByHash(series1.Hash(), series1)
require.Greater(t, len(ms.oooMmappedChunks), 0, "OOO mmap chunk was not replayed") require.Greater(t, len(ms.ooo.oooMmappedChunks), 0, "OOO mmap chunk was not replayed")
checkMmapFileContents := func(contains, notContains []string) { checkMmapFileContents := func(contains, notContains []string) {
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
@ -5806,7 +5802,7 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
checkMmapFileContents([]string{"000001", "000002"}, nil) checkMmapFileContents([]string{"000001", "000002"}, nil)
require.NoError(t, db.Compact()) require.NoError(t, db.Compact())
checkMmapFileContents([]string{"000002"}, []string{"000001"}) checkMmapFileContents([]string{"000002"}, []string{"000001"})
require.Equal(t, 0, len(ms.oooMmappedChunks), "OOO mmap chunk was not compacted") require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted")
addSamples(501, 650) addSamples(501, 650)
checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"}) checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"})

View file

@ -763,7 +763,11 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
h.metrics.chunks.Inc() h.metrics.chunks.Inc()
h.metrics.chunksCreated.Inc() h.metrics.chunksCreated.Inc()
ms.oooMmappedChunks = append(ms.oooMmappedChunks, &mmappedChunk{ if ms.ooo == nil {
ms.ooo = &memSeriesOOOFields{}
}
ms.ooo.oooMmappedChunks = append(ms.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef, ref: chunkRef,
minTime: mint, minTime: mint,
maxTime: maxt, maxTime: maxt,
@ -1666,24 +1670,24 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
minMmapFile = seq minMmapFile = seq
} }
} }
if len(series.oooMmappedChunks) > 0 { if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 {
seq, _ := series.oooMmappedChunks[0].ref.Unpack() seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack()
if seq < minMmapFile { if seq < minMmapFile {
minMmapFile = seq minMmapFile = seq
} }
for _, ch := range series.oooMmappedChunks { for _, ch := range series.ooo.oooMmappedChunks {
if ch.minTime < minOOOTime { if ch.minTime < minOOOTime {
minOOOTime = ch.minTime minOOOTime = ch.minTime
} }
} }
} }
if series.oooHeadChunk != nil { if series.ooo != nil && series.ooo.oooHeadChunk != nil {
if series.oooHeadChunk.minTime < minOOOTime { if series.ooo.oooHeadChunk.minTime < minOOOTime {
minOOOTime = series.oooHeadChunk.minTime minOOOTime = series.ooo.oooHeadChunk.minTime
} }
} }
if len(series.mmappedChunks) > 0 || len(series.oooMmappedChunks) > 0 || if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit ||
series.headChunk != nil || series.oooHeadChunk != nil || series.pendingCommit { (series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) {
seriesMint := series.minTime() seriesMint := series.minTime()
if seriesMint < actualMint { if seriesMint < actualMint {
actualMint = seriesMint actualMint = seriesMint
@ -1840,9 +1844,7 @@ type memSeries struct {
headChunk *memChunk // Most recent chunk in memory that's still being built. headChunk *memChunk // Most recent chunk in memory that's still being built.
firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0] firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0]
oooMmappedChunks []*mmappedChunk // Immutable chunks on disk containing OOO samples. ooo *memSeriesOOOFields
oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built.
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
@ -1866,6 +1868,14 @@ type memSeries struct {
pendingCommit bool // Whether there are samples waiting to be committed to this series. pendingCommit bool // Whether there are samples waiting to be committed to this series.
} }
// memSeriesOOOFields contains the fields required by memSeries
// to handle out-of-order data.
type memSeriesOOOFields struct {
oooMmappedChunks []*mmappedChunk // Immutable chunks on disk containing OOO samples.
oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built.
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
}
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
@ -1924,15 +1934,19 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD
} }
var removedOOO int var removedOOO int
if len(s.oooMmappedChunks) > 0 { if s.ooo != nil && len(s.ooo.oooMmappedChunks) > 0 {
for i, c := range s.oooMmappedChunks { for i, c := range s.ooo.oooMmappedChunks {
if c.ref.GreaterThan(minOOOMmapRef) { if c.ref.GreaterThan(minOOOMmapRef) {
break break
} }
removedOOO = i + 1 removedOOO = i + 1
} }
s.oooMmappedChunks = append(s.oooMmappedChunks[:0], s.oooMmappedChunks[removedOOO:]...) s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks[:0], s.ooo.oooMmappedChunks[removedOOO:]...)
s.firstOOOChunkID += chunks.HeadChunkID(removedOOO) s.ooo.firstOOOChunkID += chunks.HeadChunkID(removedOOO)
if len(s.ooo.oooMmappedChunks) == 0 && s.ooo.oooHeadChunk == nil {
s.ooo = nil
}
} }
return removedInOrder + removedOOO return removedInOrder + removedOOO

View file

@ -1093,7 +1093,10 @@ func (a *headAppender) Commit() (err error) {
// insert is like append, except it inserts. Used for OOO samples. // insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) { func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
c := s.oooHeadChunk if s.ooo == nil {
s.ooo = &memSeriesOOOFields{}
}
c := s.ooo.oooHeadChunk
if c == nil || c.chunk.NumSamples() == int(oooCapMax) { if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper) c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
@ -1412,33 +1415,35 @@ func (s *memSeries) cutNewHeadChunk(
return s.headChunk return s.headChunk
} }
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) { func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
s.oooHeadChunk = &oooHeadChunk{ s.ooo.oooHeadChunk = &oooHeadChunk{
chunk: NewOOOChunk(), chunk: NewOOOChunk(),
minTime: mint, minTime: mint,
maxTime: math.MinInt64, maxTime: math.MinInt64,
} }
return s.oooHeadChunk, ref return s.ooo.oooHeadChunk, ref
} }
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef { func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef {
if s.oooHeadChunk == nil { if s.ooo == nil || s.ooo.oooHeadChunk == nil {
// There is no head chunk, so nothing to m-map here. // There is no head chunk, so nothing to m-map here.
return 0 return 0
} }
xor, _ := s.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality. xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality.
oooXor := &chunkenc.OOOXORChunk{XORChunk: xor} oooXor := &chunkenc.OOOXORChunk{XORChunk: xor}
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.oooHeadChunk.minTime, s.oooHeadChunk.maxTime, oooXor, handleChunkWriteError) chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, oooXor, handleChunkWriteError)
s.oooMmappedChunks = append(s.oooMmappedChunks, &mmappedChunk{ s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef, ref: chunkRef,
numSamples: uint16(xor.NumSamples()), numSamples: uint16(xor.NumSamples()),
minTime: s.oooHeadChunk.minTime, minTime: s.ooo.oooHeadChunk.minTime,
maxTime: s.oooHeadChunk.maxTime, maxTime: s.ooo.oooHeadChunk.maxTime,
}) })
s.oooHeadChunk = nil s.ooo.oooHeadChunk = nil
return chunkRef return chunkRef
} }

View file

@ -196,8 +196,9 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
// oooHeadChunkID returns the HeadChunkID referred to by the given position. // oooHeadChunkID returns the HeadChunkID referred to by the given position.
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos] // * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk // * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.firstOOOChunkID return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID
} }
// LabelValueFor returns label value for the given label name in the series referred to by ID. // LabelValueFor returns label value for the given label name in the series referred to by ID.
@ -349,6 +350,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
// might be a merge of all the overlapping chunks, if any, amongst all the // might be a merge of all the overlapping chunks, if any, amongst all the
// chunks in the OOOHead. // chunks in the OOOHead.
// This function is not thread safe unless the caller holds a lock. // This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (chunk *mergedOOOChunks, err error) { func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (chunk *mergedOOOChunks, err error) {
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack() _, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
@ -356,23 +358,23 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index. // incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := int(cid) - int(s.firstOOOChunkID) ix := int(cid) - int(s.ooo.firstOOOChunkID)
if ix < 0 || ix > len(s.oooMmappedChunks) { if ix < 0 || ix > len(s.ooo.oooMmappedChunks) {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} }
if ix == len(s.oooMmappedChunks) { if ix == len(s.ooo.oooMmappedChunks) {
if s.oooHeadChunk == nil { if s.ooo.oooHeadChunk == nil {
return nil, errors.New("invalid ooo head chunk") return nil, errors.New("invalid ooo head chunk")
} }
} }
// We create a temporary slice of chunk metas to hold the information of all // We create a temporary slice of chunk metas to hold the information of all
// possible chunks that may overlap with the requested chunk. // possible chunks that may overlap with the requested chunk.
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.oooMmappedChunks)) tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks))
oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.oooMmappedChunks)))) oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
if s.oooHeadChunk != nil && s.oooHeadChunk.OverlapsClosedInterval(mint, maxt) { if s.ooo.oooHeadChunk != nil && s.ooo.oooHeadChunk.OverlapsClosedInterval(mint, maxt) {
// We only want to append the head chunk if this chunk existed when // We only want to append the head chunk if this chunk existed when
// Series() was called. This brings consistency in case new data // Series() was called. This brings consistency in case new data
// is added in between Series() and Chunk() calls. // is added in between Series() and Chunk() calls.
@ -388,7 +390,7 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
} }
} }
for i, c := range s.oooMmappedChunks { for i, c := range s.ooo.oooMmappedChunks {
chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
// We can skip chunks that came in later than the last known OOOLastRef. // We can skip chunks that came in later than the last known OOOLastRef.
if chunkRef > meta.OOOLastRef { if chunkRef > meta.OOOLastRef {
@ -433,11 +435,11 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
// If head chunk min and max time match the meta OOO markers // If head chunk min and max time match the meta OOO markers
// that means that the chunk has not expanded so we can append // that means that the chunk has not expanded so we can append
// it as it is. // it as it is.
if s.oooHeadChunk.minTime == meta.OOOLastMinTime && s.oooHeadChunk.maxTime == meta.OOOLastMaxTime { if s.ooo.oooHeadChunk.minTime == meta.OOOLastMinTime && s.ooo.oooHeadChunk.maxTime == meta.OOOLastMaxTime {
xor, err = s.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called. xor, err = s.ooo.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called.
} else { } else {
// We need to remove samples that are outside of the markers // We need to remove samples that are outside of the markers
xor, err = s.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime) xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime)
} }
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk") return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk")

View file

@ -4008,7 +4008,7 @@ func TestOOOWalReplay(t *testing.T) {
require.False(t, ok) require.False(t, ok)
require.NotNil(t, ms) require.NotNil(t, ms)
xor, err := ms.oooHeadChunk.chunk.ToXOR() xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR()
require.NoError(t, err) require.NoError(t, err)
it := xor.Iterator(nil) it := xor.Iterator(nil)
@ -4068,16 +4068,16 @@ func TestOOOMmapReplay(t *testing.T) {
require.False(t, ok) require.False(t, ok)
require.NotNil(t, ms) require.NotNil(t, ms)
require.Len(t, ms.oooMmappedChunks, 3) require.Len(t, ms.ooo.oooMmappedChunks, 3)
// Verify that we can access the chunks without error. // Verify that we can access the chunks without error.
for _, m := range ms.oooMmappedChunks { for _, m := range ms.ooo.oooMmappedChunks {
chk, err := h.chunkDiskMapper.Chunk(m.ref) chk, err := h.chunkDiskMapper.Chunk(m.ref)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int(m.numSamples), chk.NumSamples()) require.Equal(t, int(m.numSamples), chk.NumSamples())
} }
expMmapChunks := make([]*mmappedChunk, 3) expMmapChunks := make([]*mmappedChunk, 3)
copy(expMmapChunks, ms.oooMmappedChunks) copy(expMmapChunks, ms.ooo.oooMmappedChunks)
// Restart head. // Restart head.
require.NoError(t, h.Close()) require.NoError(t, h.Close())
@ -4096,16 +4096,16 @@ func TestOOOMmapReplay(t *testing.T) {
require.False(t, ok) require.False(t, ok)
require.NotNil(t, ms) require.NotNil(t, ms)
require.Len(t, ms.oooMmappedChunks, len(expMmapChunks)) require.Len(t, ms.ooo.oooMmappedChunks, len(expMmapChunks))
// Verify that we can access the chunks without error. // Verify that we can access the chunks without error.
for _, m := range ms.oooMmappedChunks { for _, m := range ms.ooo.oooMmappedChunks {
chk, err := h.chunkDiskMapper.Chunk(m.ref) chk, err := h.chunkDiskMapper.Chunk(m.ref)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int(m.numSamples), chk.NumSamples()) require.Equal(t, int(m.numSamples), chk.NumSamples())
} }
actMmapChunks := make([]*mmappedChunk, len(expMmapChunks)) actMmapChunks := make([]*mmappedChunk, len(expMmapChunks))
copy(actMmapChunks, ms.oooMmappedChunks) copy(actMmapChunks, ms.ooo.oooMmappedChunks)
require.Equal(t, expMmapChunks, actMmapChunks) require.Equal(t, expMmapChunks, actMmapChunks)
@ -4500,8 +4500,8 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
require.NotNil(t, ms) require.NotNil(t, ms)
require.Nil(t, ms.headChunk) require.Nil(t, ms.headChunk)
require.NotNil(t, ms.oooHeadChunk) require.NotNil(t, ms.ooo.oooHeadChunk)
require.Equal(t, expSamples, ms.oooHeadChunk.chunk.NumSamples()) require.Equal(t, expSamples, ms.ooo.oooHeadChunk.chunk.NumSamples())
} }
verifyInOrderSamples := func(lbls labels.Labels, expSamples int) { verifyInOrderSamples := func(lbls labels.Labels, expSamples int) {
@ -4510,7 +4510,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
require.False(t, created) require.False(t, created)
require.NotNil(t, ms) require.NotNil(t, ms)
require.Nil(t, ms.oooHeadChunk) require.Nil(t, ms.ooo)
require.NotNil(t, ms.headChunk) require.NotNil(t, ms.headChunk)
require.Equal(t, expSamples, ms.headChunk.chunk.NumSamples()) require.Equal(t, expSamples, ms.headChunk.chunk.NumSamples())
} }

View file

@ -499,7 +499,15 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks)))
mSeries.mmappedChunks = mmc mSeries.mmappedChunks = mmc
mSeries.oooMmappedChunks = oooMmc mSeries.ooo = nil
if len(oooMmc) == 0 {
mSeries.ooo = nil
} else {
if mSeries.ooo == nil {
mSeries.ooo = &memSeriesOOOFields{}
}
*mSeries.ooo = memSeriesOOOFields{oooMmappedChunks: oooMmc}
}
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. // Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
if len(mmc) == 0 { if len(mmc) == 0 {
mSeries.mmMaxTime = math.MinInt64 mSeries.mmMaxTime = math.MinInt64
@ -816,7 +824,9 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
// chunk size parameters, we are not taking care of that here. // chunk size parameters, we are not taking care of that here.
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if // TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
// the size of ooo chunk was reduced between restart. // the size of ooo chunk was reduced between restart.
ms.oooHeadChunk = nil if ms.ooo != nil {
ms.ooo.oooHeadChunk = nil
}
processors[idx].mx.Unlock() processors[idx].mx.Unlock()
} }

View file

@ -71,7 +71,11 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
defer s.Unlock() defer s.Unlock()
*chks = (*chks)[:0] *chks = (*chks)[:0]
tmpChks := make([]chunks.Meta, 0, len(s.oooMmappedChunks)) if s.ooo == nil {
return nil
}
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
// We define these markers to track the last chunk reference while we // We define these markers to track the last chunk reference while we
// fill the chunk meta. // fill the chunk meta.
@ -103,15 +107,15 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
// Collect all chunks that overlap the query range, in order from most recent to most old, // Collect all chunks that overlap the query range, in order from most recent to most old,
// so we can set the correct markers. // so we can set the correct markers.
if s.oooHeadChunk != nil { if s.ooo.oooHeadChunk != nil {
c := s.oooHeadChunk c := s.ooo.oooHeadChunk
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && lastMmapRef == 0 { if c.OverlapsClosedInterval(oh.mint, oh.maxt) && lastMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.oooMmappedChunks)))) ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
addChunk(c.minTime, c.maxTime, ref) addChunk(c.minTime, c.maxTime, ref)
} }
} }
for i := len(s.oooMmappedChunks) - 1; i >= 0; i-- { for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
c := s.oooMmappedChunks[i] c := s.ooo.oooMmappedChunks[i]
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (lastMmapRef == 0 || lastMmapRef.GreaterThanOrEqualTo(c.ref)) { if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (lastMmapRef == 0 || lastMmapRef.GreaterThanOrEqualTo(c.ref)) {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
addChunk(c.minTime, c.maxTime, ref) addChunk(c.minTime, c.maxTime, ref)
@ -232,6 +236,11 @@ func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
} }
s.Lock() s.Lock()
if s.ooo == nil {
// There is no OOO data for this series.
s.Unlock()
return nil, storage.ErrNotFound
}
c, err := s.oooMergedChunk(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt) c, err := s.oooMergedChunk(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt)
s.Unlock() s.Unlock()
if err != nil { if err != nil {
@ -302,18 +311,23 @@ func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) {
// TODO: consider having a lock specifically for ooo data. // TODO: consider having a lock specifically for ooo data.
ms.Lock() ms.Lock()
if ms.ooo == nil {
ms.Unlock()
continue
}
mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
if mmapRef == 0 && len(ms.oooMmappedChunks) > 0 { if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
mmapRef = ms.oooMmappedChunks[len(ms.oooMmappedChunks)-1].ref mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref
} }
seq, off := mmapRef.Unpack() seq, off := mmapRef.Unpack()
if seq > lastSeq || (seq == lastSeq && off > lastOff) { if seq > lastSeq || (seq == lastSeq && off > lastOff) {
ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off
} }
if len(ms.oooMmappedChunks) > 0 { if len(ms.ooo.oooMmappedChunks) > 0 {
ch.postings = append(ch.postings, seriesRef) ch.postings = append(ch.postings, seriesRef)
for _, c := range ms.oooMmappedChunks { for _, c := range ms.ooo.oooMmappedChunks {
if c.minTime < ch.mint { if c.minTime < ch.mint {
ch.mint = c.minTime ch.mint = c.minTime
} }

View file

@ -301,6 +301,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
s1, _, _ := h.getOrCreate(s1ID, s1Lset) s1, _, _ := h.getOrCreate(s1ID, s1Lset)
s1.ooo = &memSeriesOOOFields{}
var lastChunk chunkInterval var lastChunk chunkInterval
var lastChunkPos int var lastChunkPos int
@ -340,7 +341,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
if headChunk && len(intervals) > 0 { if headChunk && len(intervals) > 0 {
// Put the last interval in the head chunk // Put the last interval in the head chunk
s1.oooHeadChunk = &oooHeadChunk{ s1.ooo.oooHeadChunk = &oooHeadChunk{
minTime: intervals[len(intervals)-1].mint, minTime: intervals[len(intervals)-1].mint,
maxTime: intervals[len(intervals)-1].maxt, maxTime: intervals[len(intervals)-1].maxt,
} }
@ -348,7 +349,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
} }
for _, ic := range intervals { for _, ic := range intervals {
s1.oooMmappedChunks = append(s1.oooMmappedChunks, &mmappedChunk{ s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{
minTime: ic.mint, minTime: ic.mint,
maxTime: ic.maxt, maxTime: ic.maxt,
}) })