From c02b13b7f4a145ce48475f7ceab21e3ad32802ca Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Tue, 12 Apr 2022 12:35:10 +0200 Subject: [PATCH] Discard unknown chunk encodings (#196) * Chunks replay skips chunks with unknown encodings We've changed the logic of loadMmappedChunks to skip chunks that have unknown encodings. To do so we've modified IterateAllChunks to accept an extra encoding argument in the callback function. Also added unit tests in the head and chunk disk mapper. * Also add an unit test for the old chunk diskmapper * s/createUnsupportedChunk/writeUnsupportedChunk/g --- tsdb/chunks/head_chunks.go | 9 +-- tsdb/chunks/head_chunks_test.go | 101 ++++++++++++++++++++++++++-- tsdb/chunks/old_head_chunks.go | 8 ++- tsdb/chunks/old_head_chunks_test.go | 55 +++++++++++++-- tsdb/head.go | 10 ++- tsdb/head_test.go | 80 +++++++++++++++++++++- 6 files changed, 243 insertions(+), 20 deletions(-) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 4a849f844b..78bc074928 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -714,7 +714,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error // and runs the provided function with information about each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) { +func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -776,8 +776,9 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu if seriesRef == 0 && mint == 0 && maxt == 0 { break } - - idx += ChunkEncodingSize // Skip encoding. + // Encoding. + chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) + idx += ChunkEncodingSize dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) idx += n @@ -812,7 +813,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu mmapFile.maxt = maxt } - if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil { + if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { if cerr, ok := err.(*CorruptionErr); ok { cerr.Dir = cdm.dir.Name() cerr.FileIndex = segID diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 367fc0d002..cb1358e258 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -15,13 +15,13 @@ package chunks import ( "encoding/binary" - "errors" "io/ioutil" "math/rand" "os" "strconv" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -132,13 +132,13 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { hrw = createChunkDiskMapper(t, dir) idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { t.Helper() expData := expectedData[idx] require.Equal(t, expData.seriesRef, seriesRef) require.Equal(t, expData.chunkRef, chunkRef) - require.Equal(t, expData.maxt, maxt) + require.Equal(t, expData.mint, mint) require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.numSamples, numSamples) @@ -152,6 +152,44 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.Equal(t, len(expectedData), idx) } +func TestChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { + hrw := createChunkDiskMapper(t, "") + defer func() { + require.NoError(t, hrw.Close()) + }() + + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw, nil) + + // Checking on-disk bytes for the first file. + require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) + require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) + + // Testing IterateAllChunks method. + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + hrw = createChunkDiskMapper(t, dir) + + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + t.Helper() + + require.Equal(t, ucSeriesRef, seriesRef) + require.Equal(t, ucChkRef, chunkRef) + require.Equal(t, ucMint, mint) + require.Equal(t, ucMaxt, maxt) + require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR + + actChunk, err := hrw.Chunk(chunkRef) + // The chunk encoding is unknown so Chunk() should fail but us the caller + // are ok with that. Above we asserted that the encoding we expected was + // EncUnsupportedXOR + require.NotNil(t, err) + require.Contains(t, err.Error(), "invalid chunk encoding \"\"") + require.Nil(t, actChunk) + + return nil + })) +} + // TestChunkDiskMapper_Truncate tests // * If truncation is happening properly based on the time passed. // * The active file is not deleted even if the passed time makes it eligible to be deleted. @@ -366,7 +404,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { hrw = createChunkDiskMapper(t, dir) // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { + require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return errors.New("random error") })) @@ -461,7 +499,9 @@ func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper { hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, DefaultWriteQueueSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) return hrw @@ -478,6 +518,17 @@ func randomChunk(t *testing.T) chunkenc.Chunk { return chunk } +func randomUnsupportedChunk(t *testing.T) chunkenc.Chunk { + chunk := newUnsupportedChunk() + len := rand.Int() % 120 + app, err := chunk.Appender() + require.NoError(t, err) + for i := 0; i < len; i++ { + app.Append(rand.Int63(), rand.Float64()) + } + return chunk +} + func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { var err error seriesRef = HeadSeriesRef(rand.Int63()) @@ -492,3 +543,43 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer <-awaitCb return } + +func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper, hrwOld *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { + var err error + seriesRef = HeadSeriesRef(rand.Int63()) + mint = int64((idx)*1000 + 1) + maxt = int64((idx + 1) * 1000) + chunk = randomUnsupportedChunk(t) + awaitCb := make(chan struct{}) + if hrw != nil { + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) + close(awaitCb) + }) + } else { + chunkRef = hrwOld.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) + }) + close(awaitCb) + } + <-awaitCb + return +} + +const ( + UnsupportedMask = 0b10000000 + EncUnsupportedXOR = chunkenc.EncXOR | UnsupportedMask +) + +// unsupportedChunk holds a XORChunk and overrides the Encoding() method. +type unsupportedChunk struct { + *chunkenc.XORChunk +} + +func newUnsupportedChunk() *unsupportedChunk { + return &unsupportedChunk{chunkenc.NewXORChunk()} +} + +func (c *unsupportedChunk) Encoding() chunkenc.Encoding { + return EncUnsupportedXOR +} diff --git a/tsdb/chunks/old_head_chunks.go b/tsdb/chunks/old_head_chunks.go index c169d48bed..5301fd1ace 100644 --- a/tsdb/chunks/old_head_chunks.go +++ b/tsdb/chunks/old_head_chunks.go @@ -473,7 +473,7 @@ func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, er // and runs the provided function with information about each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) { +func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -536,7 +536,9 @@ func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, break } - idx += ChunkEncodingSize // Skip encoding. + // Encoding. + chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) + idx += ChunkEncodingSize dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) idx += n @@ -571,7 +573,7 @@ func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, mmapFile.maxt = maxt } - if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil { + if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { if cerr, ok := err.(*CorruptionErr); ok { cerr.Dir = cdm.dir.Name() cerr.FileIndex = segID diff --git a/tsdb/chunks/old_head_chunks_test.go b/tsdb/chunks/old_head_chunks_test.go index 4a1943bf05..d81ef3df22 100644 --- a/tsdb/chunks/old_head_chunks_test.go +++ b/tsdb/chunks/old_head_chunks_test.go @@ -133,7 +133,7 @@ func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.NoError(t, err) idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { t.Helper() expData := expectedData[idx] @@ -153,6 +153,45 @@ func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.Equal(t, len(expectedData), idx) } +func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw) + + // Checking on-disk bytes for the first file. + require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) + require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) + + // Testing IterateAllChunks method. + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + t.Helper() + + require.Equal(t, ucSeriesRef, seriesRef) + require.Equal(t, ucChkRef, chunkRef) + require.Equal(t, ucMint, mint) + require.Equal(t, ucMaxt, maxt) + require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR + + actChunk, err := hrw.Chunk(chunkRef) + // The chunk encoding is unknown so Chunk() should fail but us the caller + // are ok with that. Above we asserted that the encoding we expected was + // EncUnsupportedXOR + require.NotNil(t, err) + require.Contains(t, err.Error(), "invalid chunk encoding \"\"") + require.Nil(t, actChunk) + + return nil + })) +} + // TestOldChunkDiskMapper_Truncate tests // * If truncation is happening properly based on the time passed. // * The active file is not deleted even if the passed time makes it eligible to be deleted. @@ -222,7 +261,9 @@ func TestOldChunkDiskMapper_Truncate(t *testing.T) { require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) verifyFiles([]int{3, 4, 5, 6, 7, 8}) @@ -338,7 +379,7 @@ func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) { require.NoError(t, err) // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { + require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return errors.New("random error") })) @@ -395,7 +436,9 @@ func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) { hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) // Removed from memory. @@ -425,7 +468,9 @@ func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper { hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) return hrw } diff --git a/tsdb/head.go b/tsdb/head.go index 6aeef2e50e..21eb241528 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -61,7 +61,7 @@ var ( // 0 size queue to queue based chunk disk mapper. type chunkDiskMapper interface { CutNewFile() (returnErr error) - IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) + IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) Truncate(mint int64) error DeleteCorrupted(originalErr error) error Size() (int64, error) @@ -662,10 +662,16 @@ func (h *Head) Init(minValidTime int64) error { func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) (map[chunks.HeadSeriesRef][]*mmappedChunk, error) { mmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{} - if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { + if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { if maxt < h.minValidTime.Load() { return nil } + + // We ignore any chunk that doesnt have a valid encoding + if encoding != chunkenc.EncXOR { + return nil + } + ms, ok := refSeries[seriesRef] if !ok { slice := mmappedChunks[seriesRef] diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8c882ab7b2..755f17df61 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -65,7 +65,9 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. h, err := NewHead(nil, nil, wlog, opts, nil) require.NoError(t, err) - require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) @@ -3211,3 +3213,79 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Equal(t, 0, idx) require.Greater(t, offset, 0) } + +func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { + h, _ := newTestHead(t, 1000, false) + defer func() { + require.NoError(t, h.Close()) + }() + + require.NoError(t, h.Init(0)) + + ctx := context.Background() + app := h.Appender(ctx) + seriesLabels := labels.FromStrings("a", "1") + var seriesRef storage.SeriesRef + var err error + for i := 0; i < 400; i++ { + seriesRef, err = app.Append(0, seriesLabels, int64(i), float64(i)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 1.0) + + uc := newUnsupportedChunk() + // Make this chunk not overlap with the previous and the next + h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, func(err error) { require.NoError(t, err) }) + + app = h.Appender(ctx) + for i := 700; i < 1200; i++ { + _, err := app.Append(0, seriesLabels, int64(i), float64(i)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 4.0) + + series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels) + require.NoError(t, err) + require.False(t, created, "should already exist") + require.NotNil(t, series, "should return the series we created above") + + expChunks := make([]*mmappedChunk, len(series.mmappedChunks)) + copy(expChunks, series.mmappedChunks) + + require.NoError(t, h.Close()) + + wlog, err := wal.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false) + require.NoError(t, err) + h, err = NewHead(nil, nil, wlog, h.opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels) + require.NoError(t, err) + require.False(t, created, "should already exist") + require.NotNil(t, series, "should return the series we created above") + + require.Equal(t, expChunks, series.mmappedChunks) +} + +const ( + UnsupportedMask = 0b10000000 + EncUnsupportedXOR = chunkenc.EncXOR | UnsupportedMask +) + +// unsupportedChunk holds a XORChunk and overrides the Encoding() method. +type unsupportedChunk struct { + *chunkenc.XORChunk +} + +func newUnsupportedChunk() *unsupportedChunk { + return &unsupportedChunk{chunkenc.NewXORChunk()} +} + +func (c *unsupportedChunk) Encoding() chunkenc.Encoding { + return EncUnsupportedXOR +}