diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 4a849f844b..78bc074928 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -714,7 +714,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error // and runs the provided function with information about each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) { +func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -776,8 +776,9 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu if seriesRef == 0 && mint == 0 && maxt == 0 { break } - - idx += ChunkEncodingSize // Skip encoding. + // Encoding. + chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) + idx += ChunkEncodingSize dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) idx += n @@ -812,7 +813,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu mmapFile.maxt = maxt } - if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil { + if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { if cerr, ok := err.(*CorruptionErr); ok { cerr.Dir = cdm.dir.Name() cerr.FileIndex = segID diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 367fc0d002..cb1358e258 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -15,13 +15,13 @@ package chunks import ( "encoding/binary" - "errors" "io/ioutil" "math/rand" "os" "strconv" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -132,13 +132,13 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { hrw = createChunkDiskMapper(t, dir) idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { t.Helper() expData := expectedData[idx] require.Equal(t, expData.seriesRef, seriesRef) require.Equal(t, expData.chunkRef, chunkRef) - require.Equal(t, expData.maxt, maxt) + require.Equal(t, expData.mint, mint) require.Equal(t, expData.maxt, maxt) require.Equal(t, expData.numSamples, numSamples) @@ -152,6 +152,44 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.Equal(t, len(expectedData), idx) } +func TestChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { + hrw := createChunkDiskMapper(t, "") + defer func() { + require.NoError(t, hrw.Close()) + }() + + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, hrw, nil) + + // Checking on-disk bytes for the first file. + require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) + require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) + + // Testing IterateAllChunks method. + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + hrw = createChunkDiskMapper(t, dir) + + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + t.Helper() + + require.Equal(t, ucSeriesRef, seriesRef) + require.Equal(t, ucChkRef, chunkRef) + require.Equal(t, ucMint, mint) + require.Equal(t, ucMaxt, maxt) + require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR + + actChunk, err := hrw.Chunk(chunkRef) + // The chunk encoding is unknown so Chunk() should fail but us the caller + // are ok with that. Above we asserted that the encoding we expected was + // EncUnsupportedXOR + require.NotNil(t, err) + require.Contains(t, err.Error(), "invalid chunk encoding \"\"") + require.Nil(t, actChunk) + + return nil + })) +} + // TestChunkDiskMapper_Truncate tests // * If truncation is happening properly based on the time passed. // * The active file is not deleted even if the passed time makes it eligible to be deleted. @@ -366,7 +404,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { hrw = createChunkDiskMapper(t, dir) // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { + require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return errors.New("random error") })) @@ -461,7 +499,9 @@ func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper { hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, DefaultWriteQueueSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) return hrw @@ -478,6 +518,17 @@ func randomChunk(t *testing.T) chunkenc.Chunk { return chunk } +func randomUnsupportedChunk(t *testing.T) chunkenc.Chunk { + chunk := newUnsupportedChunk() + len := rand.Int() % 120 + app, err := chunk.Appender() + require.NoError(t, err) + for i := 0; i < len; i++ { + app.Append(rand.Int63(), rand.Float64()) + } + return chunk +} + func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { var err error seriesRef = HeadSeriesRef(rand.Int63()) @@ -492,3 +543,43 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer <-awaitCb return } + +func writeUnsupportedChunk(t *testing.T, idx int, hrw *ChunkDiskMapper, hrwOld *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { + var err error + seriesRef = HeadSeriesRef(rand.Int63()) + mint = int64((idx)*1000 + 1) + maxt = int64((idx + 1) * 1000) + chunk = randomUnsupportedChunk(t) + awaitCb := make(chan struct{}) + if hrw != nil { + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) + close(awaitCb) + }) + } else { + chunkRef = hrwOld.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) + }) + close(awaitCb) + } + <-awaitCb + return +} + +const ( + UnsupportedMask = 0b10000000 + EncUnsupportedXOR = chunkenc.EncXOR | UnsupportedMask +) + +// unsupportedChunk holds a XORChunk and overrides the Encoding() method. +type unsupportedChunk struct { + *chunkenc.XORChunk +} + +func newUnsupportedChunk() *unsupportedChunk { + return &unsupportedChunk{chunkenc.NewXORChunk()} +} + +func (c *unsupportedChunk) Encoding() chunkenc.Encoding { + return EncUnsupportedXOR +} diff --git a/tsdb/chunks/old_head_chunks.go b/tsdb/chunks/old_head_chunks.go index c169d48bed..5301fd1ace 100644 --- a/tsdb/chunks/old_head_chunks.go +++ b/tsdb/chunks/old_head_chunks.go @@ -473,7 +473,7 @@ func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, er // and runs the provided function with information about each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) { +func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -536,7 +536,9 @@ func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, break } - idx += ChunkEncodingSize // Skip encoding. + // Encoding. + chkEnc := chunkenc.Encoding(mmapFile.byteSlice.Range(idx, idx+ChunkEncodingSize)[0]) + idx += ChunkEncodingSize dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) idx += n @@ -571,7 +573,7 @@ func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, mmapFile.maxt = maxt } - if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil { + if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc); err != nil { if cerr, ok := err.(*CorruptionErr); ok { cerr.Dir = cdm.dir.Name() cerr.FileIndex = segID diff --git a/tsdb/chunks/old_head_chunks_test.go b/tsdb/chunks/old_head_chunks_test.go index 4a1943bf05..d81ef3df22 100644 --- a/tsdb/chunks/old_head_chunks_test.go +++ b/tsdb/chunks/old_head_chunks_test.go @@ -133,7 +133,7 @@ func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.NoError(t, err) idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { t.Helper() expData := expectedData[idx] @@ -153,6 +153,45 @@ func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { require.Equal(t, len(expectedData), idx) } +func TestOldChunkDiskMapper_WriteUnsupportedChunk_Chunk_IterateChunks(t *testing.T) { + hrw := testOldChunkDiskMapper(t) + defer func() { + require.NoError(t, hrw.Close()) + }() + + ucSeriesRef, ucChkRef, ucMint, ucMaxt, uchunk := writeUnsupportedChunk(t, 0, nil, hrw) + + // Checking on-disk bytes for the first file. + require.Equal(t, 1, len(hrw.mmappedChunkFiles), "expected 1 mmapped file, got %d", len(hrw.mmappedChunkFiles)) + require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) + + // Testing IterateAllChunks method. + dir := hrw.dir.Name() + require.NoError(t, hrw.Close()) + hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) + require.NoError(t, err) + + require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { + t.Helper() + + require.Equal(t, ucSeriesRef, seriesRef) + require.Equal(t, ucChkRef, chunkRef) + require.Equal(t, ucMint, mint) + require.Equal(t, ucMaxt, maxt) + require.Equal(t, uchunk.Encoding(), encoding) // Asserts that the encoding is EncUnsupportedXOR + + actChunk, err := hrw.Chunk(chunkRef) + // The chunk encoding is unknown so Chunk() should fail but us the caller + // are ok with that. Above we asserted that the encoding we expected was + // EncUnsupportedXOR + require.NotNil(t, err) + require.Contains(t, err.Error(), "invalid chunk encoding \"\"") + require.Nil(t, actChunk) + + return nil + })) +} + // TestOldChunkDiskMapper_Truncate tests // * If truncation is happening properly based on the time passed. // * The active file is not deleted even if the passed time makes it eligible to be deleted. @@ -222,7 +261,9 @@ func TestOldChunkDiskMapper_Truncate(t *testing.T) { require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) verifyFiles([]int{3, 4, 5, 6, 7, 8}) @@ -338,7 +379,7 @@ func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) { require.NoError(t, err) // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { + require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { return errors.New("random error") })) @@ -395,7 +436,9 @@ func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) { hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) // Removed from memory. @@ -425,7 +468,9 @@ func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper { hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) require.True(t, hrw.fileMaxtSet) return hrw } diff --git a/tsdb/head.go b/tsdb/head.go index 6aeef2e50e..21eb241528 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -61,7 +61,7 @@ var ( // 0 size queue to queue based chunk disk mapper. type chunkDiskMapper interface { CutNewFile() (returnErr error) - IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) + IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error) (err error) Truncate(mint int64) error DeleteCorrupted(originalErr error) error Size() (int64, error) @@ -662,10 +662,16 @@ func (h *Head) Init(minValidTime int64) error { func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) (map[chunks.HeadSeriesRef][]*mmappedChunk, error) { mmappedChunks := map[chunks.HeadSeriesRef][]*mmappedChunk{} - if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { + if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding) error { if maxt < h.minValidTime.Load() { return nil } + + // We ignore any chunk that doesnt have a valid encoding + if encoding != chunkenc.EncXOR { + return nil + } + ms, ok := refSeries[seriesRef] if !ok { slice := mmappedChunks[seriesRef] diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8c882ab7b2..755f17df61 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -65,7 +65,9 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. h, err := NewHead(nil, nil, wlog, opts, nil) require.NoError(t, err) - require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) + require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { + return nil + })) t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) @@ -3211,3 +3213,79 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Equal(t, 0, idx) require.Greater(t, offset, 0) } + +func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) { + h, _ := newTestHead(t, 1000, false) + defer func() { + require.NoError(t, h.Close()) + }() + + require.NoError(t, h.Init(0)) + + ctx := context.Background() + app := h.Appender(ctx) + seriesLabels := labels.FromStrings("a", "1") + var seriesRef storage.SeriesRef + var err error + for i := 0; i < 400; i++ { + seriesRef, err = app.Append(0, seriesLabels, int64(i), float64(i)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 1.0) + + uc := newUnsupportedChunk() + // Make this chunk not overlap with the previous and the next + h.chunkDiskMapper.WriteChunk(chunks.HeadSeriesRef(seriesRef), 500, 600, uc, func(err error) { require.NoError(t, err) }) + + app = h.Appender(ctx) + for i := 700; i < 1200; i++ { + _, err := app.Append(0, seriesLabels, int64(i), float64(i)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + require.Greater(t, prom_testutil.ToFloat64(h.metrics.chunksCreated), 4.0) + + series, created, err := h.getOrCreate(seriesLabels.Hash(), seriesLabels) + require.NoError(t, err) + require.False(t, created, "should already exist") + require.NotNil(t, series, "should return the series we created above") + + expChunks := make([]*mmappedChunk, len(series.mmappedChunks)) + copy(expChunks, series.mmappedChunks) + + require.NoError(t, h.Close()) + + wlog, err := wal.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false) + require.NoError(t, err) + h, err = NewHead(nil, nil, wlog, h.opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + series, created, err = h.getOrCreate(seriesLabels.Hash(), seriesLabels) + require.NoError(t, err) + require.False(t, created, "should already exist") + require.NotNil(t, series, "should return the series we created above") + + require.Equal(t, expChunks, series.mmappedChunks) +} + +const ( + UnsupportedMask = 0b10000000 + EncUnsupportedXOR = chunkenc.EncXOR | UnsupportedMask +) + +// unsupportedChunk holds a XORChunk and overrides the Encoding() method. +type unsupportedChunk struct { + *chunkenc.XORChunk +} + +func newUnsupportedChunk() *unsupportedChunk { + return &unsupportedChunk{chunkenc.NewXORChunk()} +} + +func (c *unsupportedChunk) Encoding() chunkenc.Encoding { + return EncUnsupportedXOR +}