Fix: getting rid of EncOOOXOR chunk encoding (#12111)

Signed-off-by: mabhi <abhijit.mukherjee@infracloud.io>
This commit is contained in:
Abhijit Mukherjee 2023-03-16 15:53:47 +05:30 committed by GitHub
parent 58a8d526e8
commit 8f6d5dcd45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 70 additions and 59 deletions

View file

@ -47,20 +47,9 @@ func (e Encoding) String() string {
return "<unknown>"
}
// Chunk encodings for out-of-order chunks.
// These encodings must be only used by the Head block for its internal bookkeeping.
const (
OutOfOrderMask = 0b10000000
EncOOOXOR = EncXOR | OutOfOrderMask
)
func IsOutOfOrderChunk(e Encoding) bool {
return (e & OutOfOrderMask) != 0
}
// IsValidEncoding returns true for supported encodings.
func IsValidEncoding(e Encoding) bool {
return e == EncXOR || e == EncOOOXOR || e == EncHistogram || e == EncFloatHistogram
return e == EncXOR || e == EncHistogram || e == EncFloatHistogram
}
// Chunk holds a sequence of sample pairs that can be iterated over and appended to.
@ -262,7 +251,7 @@ func NewPool() Pool {
func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
switch e {
case EncXOR, EncOOOXOR:
case EncXOR:
c := p.xor.Get().(*XORChunk)
c.b.stream = b
c.b.count = 0
@ -283,7 +272,7 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
func (p *pool) Put(c Chunk) error {
switch c.Encoding() {
case EncXOR, EncOOOXOR:
case EncXOR:
xc, ok := c.(*XORChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
@ -327,7 +316,7 @@ func (p *pool) Put(c Chunk) error {
// bytes.
func FromData(e Encoding, d []byte) (Chunk, error) {
switch e {
case EncXOR, EncOOOXOR:
case EncXOR:
return &XORChunk{b: bstream{count: 0, stream: d}}, nil
case EncHistogram:
return &HistogramChunk{b: bstream{count: 0, stream: d}}, nil

View file

@ -506,12 +506,3 @@ func xorRead(br *bstreamReader, value *float64, leading, trailing *uint8) error
*value = math.Float64frombits(vbits)
return nil
}
// OOOXORChunk holds a XORChunk and overrides the Encoding() method.
type OOOXORChunk struct {
*XORChunk
}
func (c *OOOXORChunk) Encoding() Encoding {
return EncOOOXOR
}

View file

@ -42,6 +42,7 @@ type chunkWriteJob struct {
maxt int64
chk chunkenc.Chunk
ref ChunkDiskMapperRef
isOOO bool
callback func(error)
}
@ -76,7 +77,7 @@ type chunkWriteQueue struct {
}
// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool) error
type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error
func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue {
counters := prometheus.NewCounterVec(
@ -133,7 +134,7 @@ func (c *chunkWriteQueue) start() {
}
func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.cutFile)
err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.isOOO, job.cutFile)
if job.callback != nil {
job.callback(err)
}

View file

@ -31,7 +31,7 @@ func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) {
blockWriterWg.Add(1)
// blockingChunkWriter blocks until blockWriterWg is done.
blockingChunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error {
blockingChunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _, _ bool) error {
blockWriterWg.Wait()
return nil
}
@ -63,7 +63,7 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) {
gotCutFile bool
)
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error {
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error {
gotSeriesRef = seriesRef
gotMint = mint
gotMaxt = maxt
@ -101,7 +101,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
unblockChunkWriterCh := make(chan struct{}, sizeLimit)
// blockingChunkWriter blocks until the unblockChunkWriterCh channel returns a value.
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error {
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error {
<-unblockChunkWriterCh
return nil
}
@ -184,7 +184,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
testError := errors.New("test error")
chunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error {
chunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _, _ bool) error {
return testError
}
@ -212,7 +212,7 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
for _, concurrentWrites := range []int{1, 10, 100, 1000} {
b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) {
issueReadSignal := make(chan struct{})
q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, b bool) error {
q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, ooo, b bool) error {
if withReads {
select {
case issueReadSignal <- struct{}{}:

View file

@ -273,6 +273,26 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo
return m, m.openMMapFiles()
}
// Chunk encodings for out-of-order chunks.
// These encodings must be only used by the Head block for its internal bookkeeping.
const (
OutOfOrderMask = uint8(0b10000000)
)
func (cdm *ChunkDiskMapper) ApplyOutOfOrderMask(sourceEncoding chunkenc.Encoding) chunkenc.Encoding {
enc := uint8(sourceEncoding) | OutOfOrderMask
return chunkenc.Encoding(enc)
}
func (cdm *ChunkDiskMapper) IsOutOfOrderChunk(e chunkenc.Encoding) bool {
return (uint8(e) & OutOfOrderMask) != 0
}
func (cdm *ChunkDiskMapper) RemoveMasks(sourceEncoding chunkenc.Encoding) chunkenc.Encoding {
restored := uint8(sourceEncoding) & (^OutOfOrderMask)
return chunkenc.Encoding(restored)
}
// openMMapFiles opens all files within dir for mmapping.
func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
@ -403,17 +423,17 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
// WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) {
// cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue).
cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock()
ref, cutFile := cdm.evtlPos.getNextChunkRef(chk)
if cdm.writeQueue != nil {
return cdm.writeChunkViaQueue(ref, cutFile, seriesRef, mint, maxt, chk, callback)
return cdm.writeChunkViaQueue(ref, isOOO, cutFile, seriesRef, mint, maxt, chk, callback)
}
err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, cutFile)
err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile)
if callback != nil {
callback(err)
}
@ -421,7 +441,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64
return ref
}
func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, isOOO, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
var err error
if callback != nil {
defer func() {
@ -438,13 +458,14 @@ func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, cutFile b
maxt: maxt,
chk: chk,
ref: ref,
isOOO: isOOO,
callback: callback,
})
return ref
}
func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) (err error) {
func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) (err error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
@ -476,7 +497,11 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
cdm.byteBuf[bytesWritten] = byte(chk.Encoding())
enc := chk.Encoding()
if isOOO {
enc = cdm.ApplyOutOfOrderMask(enc)
}
cdm.byteBuf[bytesWritten] = byte(enc)
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes())))
bytesWritten += n
@ -696,7 +721,9 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
// Encoding.
chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0]
sourceChkEnc := chunkenc.Encoding(chkEnc)
// Extract the encoding from the byte. ChunkDiskMapper uses only the last 7 bits for the encoding.
chkEnc = byte(cdm.RemoveMasks(sourceChkEnc))
// Data length.
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
@ -762,7 +789,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
// and runs the provided function with information about each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) {
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error) (err error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
@ -860,8 +887,10 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
if maxt > mmapFile.maxt {
mmapFile.maxt = maxt
}
if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil {
isOOO := cdm.IsOutOfOrderChunk(chkEnc)
// Extract the encoding from the byte. ChunkDiskMapper uses only the last 7 bits for the encoding.
chkEnc = cdm.RemoveMasks(chkEnc)
if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc, isOOO); err != nil {
if cerr, ok := err.(*CorruptionErr); ok {
cerr.Dir = cdm.dir.Name()
cerr.FileIndex = segID

View file

@ -98,7 +98,11 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
buf[bytesWritten] = byte(chunk.Encoding())
enc := chunk.Encoding()
if isOOO {
enc = hrw.ApplyOutOfOrderMask(enc)
}
buf[bytesWritten] = byte(enc)
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
bytesWritten += n
@ -149,7 +153,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw = createChunkDiskMapper(t, dir)
idx := 0
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error {
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error {
t.Helper()
expData := expectedData[idx]
@ -158,7 +162,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.numSamples, numSamples)
require.Equal(t, expData.isOOO, chunkenc.IsOutOfOrderChunk(encoding))
require.Equal(t, expData.isOOO, isOOO)
actChunk, err := hrw.Chunk(expData.chunkRef)
require.NoError(t, err)
@ -188,7 +192,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
mint, maxt := timeRange+1, timeRange+step-1
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) {
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) {
err = cbErr
close(awaitCb)
})
@ -282,7 +286,7 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(err error) {
close(awaitCb)
require.NoError(t, err)
})
@ -363,7 +367,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
// Write a chunks to iterate on it later.
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(cbErr error) {
hrw.WriteChunk(1, 0, 1000, randomChunk(t), false, func(cbErr error) {
err = cbErr
close(awaitCb)
})
@ -377,7 +381,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw = createChunkDiskMapper(t, dir)
// Forcefully failing IterateAllChunks.
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error {
return errors.New("random error")
}))
@ -396,7 +400,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
mint, maxt := timeRange+1, timeRange+step-1
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) {
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), false, func(cbErr error) {
err = cbErr
close(awaitCb)
})
@ -489,7 +493,7 @@ func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper {
hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error {
return nil
}))
require.True(t, hrw.fileMaxtSet)
@ -517,9 +521,8 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer
awaitCb := make(chan struct{})
if rand.Intn(2) == 0 {
isOOO = true
chunk = &chunkenc.OOOXORChunk{XORChunk: chunk.(*chunkenc.XORChunk)}
}
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, isOOO, func(cbErr error) {
require.NoError(t, err)
close(awaitCb)
})

View file

@ -784,10 +784,9 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
mmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{}
oooMmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{}
var lastRef, secondLastRef chunks.ChunkDiskMapperRef
if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error {
if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error {
secondLastRef = lastRef
lastRef = chunkRef
isOOO := chunkenc.IsOutOfOrderChunk(encoding)
if !isOOO && maxt < h.minValidTime.Load() {
return nil
}

View file

@ -1453,8 +1453,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
return 0
}
xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality.
oooXor := &chunkenc.OOOXORChunk{XORChunk: xor}
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, oooXor, handleChunkWriteError)
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, xor, true, handleChunkWriteError)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(xor.NumSamples()),
@ -1471,7 +1470,7 @@ func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper
return
}
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, handleChunkWriteError)
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, false, handleChunkWriteError)
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(s.headChunk.chunk.NumSamples()),

View file

@ -69,7 +69,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (
h, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err)
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding, _ bool) error {
return nil
}))
@ -4177,7 +4177,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
uc := newUnsupportedChunk()
// Make this chunk not overlap with the previous and the next
h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, func(err error) { require.NoError(t, err) })
h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, false, func(err error) { require.NoError(t, err) })
app = h.Appender(ctx)
for i := 700; i < 1200; i++ {