From 1d9c00177284263075369005ec1632a2ea26f169 Mon Sep 17 00:00:00 2001 From: alanprot Date: Thu, 22 Aug 2024 15:59:07 -0700 Subject: [PATCH] Test to reproduce the deadlock Signed-off-by: alanprot --- tsdb/db_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4e3a077f6a..8c509c2852 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3726,6 +3726,66 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun } } +func TestMMapHeadChunksAndCompactConcurrently(t *testing.T) { + opts := DefaultOptions() + ctx := context.Background() + blockDuration := int64(1000) + opts.MinBlockDuration = blockDuration + opts.MaxBlockDuration = blockDuration + opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration + db := openTestDB(t, opts, nil) + db.DisableCompactions() + defer func() { + require.NoError(t, db.Close()) + }() + app := db.Appender(ctx) + for j := 0; j < 50000; j++ { + _, err := app.Append(0, labels.FromStrings(labels.MetricName, fmt.Sprintf("metric_to_be_gced_%v", j)), 0, float64(0)) + require.NoError(t, err) + } + + // Adding one series with 2x block duration to make the head compactable. + _, err := app.Append(0, labels.FromStrings(labels.MetricName, "metric"), 2*blockDuration, float64(0)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + done := make(chan struct{}) + + // Create 2 go routines to Compact the head and ForceHeadMMap at the same time + go func() { + defer close(done) + err := db.Compact(ctx) + require.NoError(t, err) + }() + + go func() { + for { + select { + case <-done: + return + default: + db.ForceHeadMMap() + } + } + }() + + timeout := time.After(time.Second * 10) + for { + select { + case <-done: + return + case <-timeout: + t.Fatalf("timeout waiting for compaction") + return + default: + // Lets also keep creating new series so we make sure the stripped series are changing and there is no race + app := db.Appender(ctx) + _, err := app.Append(0, labels.FromStrings(labels.MetricName, fmt.Sprintf("newSeries_%v", rand.Int63())), 2*blockDuration, float64(0)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + } +} + func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingQuerier(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = 3 * DefaultBlockDuration