diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index d5386f7ea..bc6915c80 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -556,7 +556,14 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { // The chunk data itself. chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) - chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkData) + + // Make a copy of the chunk data to prevent a panic occurring because the returned + // chunk data slice references an mmap-ed file which could be closed after the + // function returns but while the chunk is still in use. + chkDataCopy := make([]byte, len(chkData)) + copy(chkDataCopy, chkData) + + chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy) if err != nil { return nil, &CorruptionErr{ Dir: cdm.dir.Name(), diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 502f2c556..8c0808e90 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -27,6 +27,7 @@ import ( "path/filepath" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -41,6 +42,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" @@ -3122,3 +3124,259 @@ func TestNoPanicOnTSDBOpenError(t *testing.T) { require.NoError(t, lockf.Release()) } + +func TestQuerier_ShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t *testing.T) { + t.Skip("TODO: investigate why process crash in CI") + + const numRuns = 5 + + for i := 1; i <= numRuns; i++ { + t.Run(strconv.Itoa(i), func(t *testing.T) { + testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t) + }) + } +} + +func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t *testing.T) { + const ( + numSeries = 1000 + numStressIterations = 10000 + minStressAllocationBytes = 128 * 1024 + maxStressAllocationBytes = 512 * 1024 + ) + + db := openTestDB(t, nil, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + // Disable compactions so we can control it. + db.DisableCompactions() + + // Generate the metrics we're going to append. + metrics := make([]labels.Labels, 0, numSeries) + for i := 0; i < numSeries; i++ { + metrics = append(metrics, labels.Labels{{Name: labels.MetricName, Value: fmt.Sprintf("test_%d", i)}}) + } + + // Push 1 sample every 15s for 2x the block duration period. + ctx := context.Background() + interval := int64(15 * time.Second / time.Millisecond) + ts := int64(0) + + for ; ts < 2*DefaultBlockDuration; ts += interval { + app := db.Appender(ctx) + + for _, metric := range metrics { + _, err := app.Append(0, metric, ts, float64(ts)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + } + + // Compact the TSDB head for the first time. We expect the head chunks file has been cut. + require.NoError(t, db.Compact()) + require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) + + // Push more samples for another 1x block duration period. + for ; ts < 3*DefaultBlockDuration; ts += interval { + app := db.Appender(ctx) + + for _, metric := range metrics { + _, err := app.Append(0, metric, ts, float64(ts)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + } + + // At this point we expect 2 mmap-ed head chunks. + + // Get a querier and make sure it's closed only once the test is over. + querier, err := db.Querier(ctx, 0, math.MaxInt64) + require.NoError(t, err) + defer func() { + require.NoError(t, querier.Close()) + }() + + // Query back all series. + hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval} + seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) + + // Fetch samples iterators from all series. + var iterators []chunkenc.Iterator + actualSeries := 0 + for seriesSet.Next() { + actualSeries++ + + // Get the iterator and call Next() so that we're sure the chunk is loaded. + it := seriesSet.At().Iterator() + it.Next() + it.At() + + iterators = append(iterators, it) + } + require.NoError(t, seriesSet.Err()) + require.Equal(t, actualSeries, numSeries) + + // Compact the TSDB head again. + require.NoError(t, db.Compact()) + require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) + + // At this point we expect 1 head chunk has been deleted. + + // Stress the memory and call GC. This is required to increase the chances + // the chunk memory area is released to the kernel. + var buf []byte + for i := 0; i < numStressIterations; i++ { + //nolint:staticcheck + buf = append(buf, make([]byte, minStressAllocationBytes+rand.Int31n(maxStressAllocationBytes-minStressAllocationBytes))...) + if i%1000 == 0 { + buf = nil + } + } + + // Iterate samples. Here we're summing it just to make sure no golang compiler + // optimization triggers in case we discard the result of it.At(). + var sum float64 + var firstErr error + for _, it := range iterators { + for it.Next() { + _, v := it.At() + sum += v + } + + if err := it.Err(); err != nil { + firstErr = err + } + } + + // After having iterated all samples we also want to be sure no error occurred or + // the "cannot populate chunk XXX: not found" error occurred. This error can occur + // when the iterator tries to fetch an head chunk which has been offloaded because + // of the head compaction in the meanwhile. + if firstErr != nil && !strings.Contains(firstErr.Error(), "cannot populate chunk") { + t.Fatalf("unexpected error: %s", firstErr.Error()) + } +} + +func TestChunkQuerier_ShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t *testing.T) { + t.Skip("TODO: investigate why process crash in CI") + + const numRuns = 5 + + for i := 1; i <= numRuns; i++ { + t.Run(strconv.Itoa(i), func(t *testing.T) { + testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t) + }) + } +} + +func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t *testing.T) { + const ( + numSeries = 1000 + numStressIterations = 10000 + minStressAllocationBytes = 128 * 1024 + maxStressAllocationBytes = 512 * 1024 + ) + + db := openTestDB(t, nil, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + // Disable compactions so we can control it. + db.DisableCompactions() + + // Generate the metrics we're going to append. + metrics := make([]labels.Labels, 0, numSeries) + for i := 0; i < numSeries; i++ { + metrics = append(metrics, labels.Labels{{Name: labels.MetricName, Value: fmt.Sprintf("test_%d", i)}}) + } + + // Push 1 sample every 15s for 2x the block duration period. + ctx := context.Background() + interval := int64(15 * time.Second / time.Millisecond) + ts := int64(0) + + for ; ts < 2*DefaultBlockDuration; ts += interval { + app := db.Appender(ctx) + + for _, metric := range metrics { + _, err := app.Append(0, metric, ts, float64(ts)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + } + + // Compact the TSDB head for the first time. We expect the head chunks file has been cut. + require.NoError(t, db.Compact()) + require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) + + // Push more samples for another 1x block duration period. + for ; ts < 3*DefaultBlockDuration; ts += interval { + app := db.Appender(ctx) + + for _, metric := range metrics { + _, err := app.Append(0, metric, ts, float64(ts)) + require.NoError(t, err) + } + + require.NoError(t, app.Commit()) + } + + // At this point we expect 2 mmap-ed head chunks. + + // Get a querier and make sure it's closed only once the test is over. + querier, err := db.ChunkQuerier(ctx, 0, math.MaxInt64) + require.NoError(t, err) + defer func() { + require.NoError(t, querier.Close()) + }() + + // Query back all series. + hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval} + seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) + + // Iterate all series and get their chunks. + var chunks []chunkenc.Chunk + actualSeries := 0 + for seriesSet.Next() { + actualSeries++ + for it := seriesSet.At().Iterator(); it.Next(); { + chunks = append(chunks, it.At().Chunk) + } + } + require.NoError(t, seriesSet.Err()) + require.Equal(t, actualSeries, numSeries) + + // Compact the TSDB head again. + require.NoError(t, db.Compact()) + require.Equal(t, float64(2), prom_testutil.ToFloat64(db.Head().metrics.headTruncateTotal)) + + // At this point we expect 1 head chunk has been deleted. + + // Stress the memory and call GC. This is required to increase the chances + // the chunk memory area is released to the kernel. + var buf []byte + for i := 0; i < numStressIterations; i++ { + //nolint:staticcheck + buf = append(buf, make([]byte, minStressAllocationBytes+rand.Int31n(maxStressAllocationBytes-minStressAllocationBytes))...) + if i%1000 == 0 { + buf = nil + } + } + + // Iterate chunks and read their bytes slice. Here we're computing the CRC32 + // just to iterate through the bytes slice. We don't really care the reason why + // we read this data, we just need to read it to make sure the memory address + // of the []byte is still valid. + chkCRC32 := newCRC32() + for _, chunk := range chunks { + chkCRC32.Reset() + _, err := chkCRC32.Write(chunk.Bytes()) + require.NoError(t, err) + } +}