From 5642fac9fde14b3aa7c5617d38ddaae8cd6c4ac2 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Wed, 4 Dec 2024 10:50:38 +0000 Subject: [PATCH] Don't panic if ActiveQueryTracker is closed When ActiveQueryTracker is closed it closes the mmap-ed file it writes to, and so any query logged after that point will cause a panic. This change tracks when ActiveQueryTracker is closed and if that happens it stops trying to write to mmap-ed file. Fixes #15232. Signed-off-by: Lukasz Mierzwa --- promql/query_logger.go | 13 +++++++ promql/query_logger_test.go | 70 +++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/promql/query_logger.go b/promql/query_logger.go index c0a70b66d7..9b7b89cbef 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -27,9 +27,11 @@ import ( "unicode/utf8" "github.com/edsrzf/mmap-go" + "go.uber.org/atomic" ) type ActiveQueryTracker struct { + isRunning *atomic.Bool mmappedFile []byte getNextIndex chan int logger *slog.Logger @@ -145,8 +147,12 @@ func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger *s panic("Unable to create mmap-ed active query log") } + isRunning := &atomic.Bool{} + isRunning.Store(true) + copy(fileAsBytes, "[") activeQueryTracker := ActiveQueryTracker{ + isRunning: isRunning, mmappedFile: fileAsBytes, closer: closer, getNextIndex: make(chan int, maxConcurrent), @@ -205,11 +211,17 @@ func (tracker ActiveQueryTracker) GetMaxConcurrent() int { } func (tracker ActiveQueryTracker) Delete(insertIndex int) { + if !tracker.isRunning.Load() { + return + } copy(tracker.mmappedFile[insertIndex:], strings.Repeat("\x00", entrySize)) tracker.getNextIndex <- insertIndex } func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) { + if !tracker.isRunning.Load() { + return 0, errors.New("ActiveQueryTracker is stopped") + } select { case i := <-tracker.getNextIndex: fileBytes := tracker.mmappedFile @@ -229,6 +241,7 @@ func (tracker *ActiveQueryTracker) Close() error { if tracker == nil || tracker.closer == nil { return nil } + tracker.isRunning.Store(false) if err := tracker.closer.Close(); err != nil { return fmt.Errorf("close ActiveQueryTracker.closer: %w", err) } diff --git a/promql/query_logger_test.go b/promql/query_logger_test.go index eb06e513ef..86af358b7b 100644 --- a/promql/query_logger_test.go +++ b/promql/query_logger_test.go @@ -16,16 +16,21 @@ package promql import ( "context" "os" + "path" "path/filepath" "testing" "github.com/grafana/regexp" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestQueryLogging(t *testing.T) { fileAsBytes := make([]byte, 4096) + isRunning := &atomic.Bool{} + isRunning.Store(true) queryLogger := ActiveQueryTracker{ + isRunning: isRunning, mmappedFile: fileAsBytes, logger: nil, getNextIndex: make(chan int, 4), @@ -69,7 +74,10 @@ func TestQueryLogging(t *testing.T) { func TestIndexReuse(t *testing.T) { queryBytes := make([]byte, 1+3*entrySize) + isRunning := &atomic.Bool{} + isRunning.Store(true) queryLogger := ActiveQueryTracker{ + isRunning: isRunning, mmappedFile: queryBytes, logger: nil, getNextIndex: make(chan int, 3), @@ -163,3 +171,65 @@ func TestParseBrokenJSON(t *testing.T) { }) } } + +// Closing ActiveQueryTracker instance shouldn't cause panic if the engine still tries to log queries. +func TestActiveQueryTrackerClose(t *testing.T) { + dir := t.TempDir() + + maxConcurrent := 10 + fileAsBytes, closer, err := getMMappedFile(path.Join(dir, "queries.active"), 1+maxConcurrent*entrySize, nil) + require.NoError(t, err) + + isRunning := &atomic.Bool{} + isRunning.Store(true) + + queryLogger := ActiveQueryTracker{ + isRunning: isRunning, + maxConcurrent: maxConcurrent, + mmappedFile: fileAsBytes, + closer: closer, + logger: nil, + getNextIndex: make(chan int, 4), + } + + queryLogger.generateIndices(4) + veryLongString := "MassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybe" + queries := []string{ + "TestQuery", + veryLongString, + "", + "SpecialCharQuery{host=\"2132132\", id=123123}", + } + + want := []string{ + `^{"query":"TestQuery","timestamp_sec":\d+}\x00*,$`, + `^{"query":"` + trimStringByBytes(veryLongString, entrySize-40) + `","timestamp_sec":\d+}\x00*,$`, + `^{"query":"","timestamp_sec":\d+}\x00*,$`, + `^{"query":"SpecialCharQuery{host=\\"2132132\\", id=123123}","timestamp_sec":\d+}\x00*,$`, + } + + // Check for inserts of queries. + for i := 0; i < 4; i++ { + start := 1 + i*entrySize + end := start + entrySize + + queryLogger.Insert(context.Background(), queries[i]) + + have := string(fileAsBytes[start:end]) + require.True(t, regexp.MustCompile(want[i]).MatchString(have), + "Query not written correctly: %s", queries[i]) + } + + // Delete first 2 queries + for i := 0; i < 2; i++ { + queryLogger.Delete(1 + i*entrySize) + } + + // Close the query logger + require.NoError(t, queryLogger.Close()) + + // Try to delete last 2 queries + for i := 2; i < 4; i++ { + queryLogger.Delete(1 + i*entrySize) + } +}