diff --git a/tsdb/head.go b/tsdb/head.go index 9d81b24ae..3f7c552c3 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -128,6 +128,7 @@ type Head struct { writeNotified wlog.WriteNotified memTruncationInProcess atomic.Bool + memTruncationCallBack func() } type ExemplarStorage interface { @@ -1147,6 +1148,10 @@ func (h *Head) truncateMemory(mint int64) (err error) { h.memTruncationInProcess.Store(true) defer h.memTruncationInProcess.Store(false) + if h.memTruncationCallBack != nil { + h.memTruncationCallBack() + } + // We wait for pending queries to end that overlap with this truncation. if initialized { h.WaitForPendingReadersInTimeRange(h.MinTime(), mint) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 90fabdb7a..24402a9e7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3506,12 +3506,13 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) { } func TestQueryOOOHeadDuringTruncate(t *testing.T) { - const maxT int64 = 4000 + const maxT int64 = 6000 dir := t.TempDir() opts := DefaultOptions() opts.EnableNativeHistograms = true opts.OutOfOrderTimeWindow = maxT + opts.MinBlockDuration = maxT / 2 // So that head will compact up to 3000. db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) @@ -3538,49 +3539,57 @@ func TestQueryOOOHeadDuringTruncate(t *testing.T) { requireEqualOOOSamples(t, int(maxT/100-1), db) - // This mocks truncation. - db.head.memTruncationInProcess.Store(true) - db.head.lastMemoryTruncationTime.Store(3000) + // Synchronization points. + allowQueryToStart := make(chan struct{}) + queryStarted := make(chan struct{}) + compactionFinished := make(chan struct{}) - t.Run("LabelNames", func(t *testing.T) { - // Query the head and overlap with the truncation time. - q, err := db.Querier(1500, 2500) - require.NoError(t, err) - ctx := context.Background() - res, annots, err := q.LabelNames(ctx, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) - require.NoError(t, err) - require.Empty(t, annots) - require.Equal(t, []string{"a"}, res) - require.NoError(t, q.Close()) - }) + db.head.memTruncationCallBack = func() { + // Compaction has started, let the query start and wait for it to actually start to simulate race condition. + allowQueryToStart <- struct{}{} + <-queryStarted + } - t.Run("LabelValues", func(t *testing.T) { - // Query the head and overlap with the truncation time. - q, err := db.Querier(1500, 2500) - require.NoError(t, err) - ctx := context.Background() - res, annots, err := q.LabelValues(ctx, "a", nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) - require.NoError(t, err) - require.Empty(t, annots) - require.Equal(t, []string{"b"}, res) - require.NoError(t, q.Close()) - }) + go func() { + db.Compact(context.Background()) // Compact and write blocks up to 3000 (maxtT/2). + compactionFinished <- struct{}{} + }() - t.Run("Select", func(t *testing.T) { - // Query the head and overlap with the truncation time. - q, err := db.Querier(1500, 2500) - require.NoError(t, err) - ctx := context.Background() - ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) - require.True(t, ss.Next()) - s := ss.At() - require.False(t, ss.Next()) // One series. - it := s.Iterator(nil) - require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data. - require.Equal(t, int64(1550), it.AtT()) // It's from OOO head. - require.NoError(t, it.Err()) - require.NoError(t, q.Close()) - }) + // Wait for the compaction to start. + <-allowQueryToStart + + q, err := db.Querier(1500, 2500) + require.NoError(t, err) + queryStarted <- struct{}{} // Unblock the compaction. + ctx := context.Background() + + // Label names. + res, annots, err := q.LabelNames(ctx, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.NoError(t, err) + require.Empty(t, annots) + require.Equal(t, []string{"a"}, res) + + // Label values. + res, annots, err = q.LabelValues(ctx, "a", nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.NoError(t, err) + require.Empty(t, annots) + require.Equal(t, []string{"b"}, res) + + // Samples + ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) // One series. + it := s.Iterator(nil) + require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data. + require.Equal(t, int64(1500), it.AtT()) // It is an in-order sample. + require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data. + require.Equal(t, int64(1550), it.AtT()) // it is an out-of-order sample. + require.NoError(t, it.Err()) + + require.NoError(t, q.Close()) // Cannot be deferred as the compaction waits for queries to close before finishing. + + <-compactionFinished // Wait for compaction otherwise Go test finds stray goroutines. } func TestAppendHistogram(t *testing.T) {