Don't panic if ActiveQueryTracker is closed

When ActiveQueryTracker is closed it closes the mmap-ed file it writes to, and so any query logged after that point will cause a panic.
This change tracks when ActiveQueryTracker is closed and if that happens it stops trying to write to mmap-ed file.

Fixes #15232.

Signed-off-by: Lukasz Mierzwa <l.mierzwa@gmail.com>
This commit is contained in:
Lukasz Mierzwa 2024-12-04 10:50:38 +00:00 committed by Lukasz Mierzwa
parent 6a6630d2a7
commit 5642fac9fd
2 changed files with 83 additions and 0 deletions

View file

@ -27,9 +27,11 @@ import (
"unicode/utf8" "unicode/utf8"
"github.com/edsrzf/mmap-go" "github.com/edsrzf/mmap-go"
"go.uber.org/atomic"
) )
type ActiveQueryTracker struct { type ActiveQueryTracker struct {
isRunning *atomic.Bool
mmappedFile []byte mmappedFile []byte
getNextIndex chan int getNextIndex chan int
logger *slog.Logger logger *slog.Logger
@ -145,8 +147,12 @@ func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger *s
panic("Unable to create mmap-ed active query log") panic("Unable to create mmap-ed active query log")
} }
isRunning := &atomic.Bool{}
isRunning.Store(true)
copy(fileAsBytes, "[") copy(fileAsBytes, "[")
activeQueryTracker := ActiveQueryTracker{ activeQueryTracker := ActiveQueryTracker{
isRunning: isRunning,
mmappedFile: fileAsBytes, mmappedFile: fileAsBytes,
closer: closer, closer: closer,
getNextIndex: make(chan int, maxConcurrent), getNextIndex: make(chan int, maxConcurrent),
@ -205,11 +211,17 @@ func (tracker ActiveQueryTracker) GetMaxConcurrent() int {
} }
func (tracker ActiveQueryTracker) Delete(insertIndex int) { func (tracker ActiveQueryTracker) Delete(insertIndex int) {
if !tracker.isRunning.Load() {
return
}
copy(tracker.mmappedFile[insertIndex:], strings.Repeat("\x00", entrySize)) copy(tracker.mmappedFile[insertIndex:], strings.Repeat("\x00", entrySize))
tracker.getNextIndex <- insertIndex tracker.getNextIndex <- insertIndex
} }
func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) { func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) {
if !tracker.isRunning.Load() {
return 0, errors.New("ActiveQueryTracker is stopped")
}
select { select {
case i := <-tracker.getNextIndex: case i := <-tracker.getNextIndex:
fileBytes := tracker.mmappedFile fileBytes := tracker.mmappedFile
@ -229,6 +241,7 @@ func (tracker *ActiveQueryTracker) Close() error {
if tracker == nil || tracker.closer == nil { if tracker == nil || tracker.closer == nil {
return nil return nil
} }
tracker.isRunning.Store(false)
if err := tracker.closer.Close(); err != nil { if err := tracker.closer.Close(); err != nil {
return fmt.Errorf("close ActiveQueryTracker.closer: %w", err) return fmt.Errorf("close ActiveQueryTracker.closer: %w", err)
} }

View file

@ -16,16 +16,21 @@ package promql
import ( import (
"context" "context"
"os" "os"
"path"
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/grafana/regexp" "github.com/grafana/regexp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic"
) )
func TestQueryLogging(t *testing.T) { func TestQueryLogging(t *testing.T) {
fileAsBytes := make([]byte, 4096) fileAsBytes := make([]byte, 4096)
isRunning := &atomic.Bool{}
isRunning.Store(true)
queryLogger := ActiveQueryTracker{ queryLogger := ActiveQueryTracker{
isRunning: isRunning,
mmappedFile: fileAsBytes, mmappedFile: fileAsBytes,
logger: nil, logger: nil,
getNextIndex: make(chan int, 4), getNextIndex: make(chan int, 4),
@ -69,7 +74,10 @@ func TestQueryLogging(t *testing.T) {
func TestIndexReuse(t *testing.T) { func TestIndexReuse(t *testing.T) {
queryBytes := make([]byte, 1+3*entrySize) queryBytes := make([]byte, 1+3*entrySize)
isRunning := &atomic.Bool{}
isRunning.Store(true)
queryLogger := ActiveQueryTracker{ queryLogger := ActiveQueryTracker{
isRunning: isRunning,
mmappedFile: queryBytes, mmappedFile: queryBytes,
logger: nil, logger: nil,
getNextIndex: make(chan int, 3), getNextIndex: make(chan int, 3),
@ -163,3 +171,65 @@ func TestParseBrokenJSON(t *testing.T) {
}) })
} }
} }
// Closing ActiveQueryTracker instance shouldn't cause panic if the engine still tries to log queries.
func TestActiveQueryTrackerClose(t *testing.T) {
dir := t.TempDir()
maxConcurrent := 10
fileAsBytes, closer, err := getMMappedFile(path.Join(dir, "queries.active"), 1+maxConcurrent*entrySize, nil)
require.NoError(t, err)
isRunning := &atomic.Bool{}
isRunning.Store(true)
queryLogger := ActiveQueryTracker{
isRunning: isRunning,
maxConcurrent: maxConcurrent,
mmappedFile: fileAsBytes,
closer: closer,
logger: nil,
getNextIndex: make(chan int, 4),
}
queryLogger.generateIndices(4)
veryLongString := "MassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybe"
queries := []string{
"TestQuery",
veryLongString,
"",
"SpecialCharQuery{host=\"2132132\", id=123123}",
}
want := []string{
`^{"query":"TestQuery","timestamp_sec":\d+}\x00*,$`,
`^{"query":"` + trimStringByBytes(veryLongString, entrySize-40) + `","timestamp_sec":\d+}\x00*,$`,
`^{"query":"","timestamp_sec":\d+}\x00*,$`,
`^{"query":"SpecialCharQuery{host=\\"2132132\\", id=123123}","timestamp_sec":\d+}\x00*,$`,
}
// Check for inserts of queries.
for i := 0; i < 4; i++ {
start := 1 + i*entrySize
end := start + entrySize
queryLogger.Insert(context.Background(), queries[i])
have := string(fileAsBytes[start:end])
require.True(t, regexp.MustCompile(want[i]).MatchString(have),
"Query not written correctly: %s", queries[i])
}
// Delete first 2 queries
for i := 0; i < 2; i++ {
queryLogger.Delete(1 + i*entrySize)
}
// Close the query logger
require.NoError(t, queryLogger.Close())
// Try to delete last 2 queries
for i := 2; i < 4; i++ {
queryLogger.Delete(1 + i*entrySize)
}
}