From aabe4d6e4ab39d4d87611668312ec4d93616b5b8 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 29 Apr 2024 16:16:51 +0200 Subject: [PATCH 1/3] promql.ActiveQueryTracker: Unmap mmapped file when done Signed-off-by: Arve Knudsen --- promql/engine_test.go | 4 +++- promql/query_logger.go | 32 ++++++++++++++++++++++++++++---- promql/query_logger_test.go | 27 ++++++++++++++------------- 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/promql/engine_test.go b/promql/engine_test.go index cc91855468..485239399d 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -59,7 +59,9 @@ func TestQueryConcurrency(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil) - t.Cleanup(queryTracker.Close) + t.Cleanup(func() { + require.NoError(t, queryTracker.Close()) + }) opts := promql.EngineOpts{ Logger: nil, diff --git a/promql/query_logger.go b/promql/query_logger.go index 7ddd8c2d5a..76528f9584 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -16,6 +16,7 @@ package promql import ( "context" "encoding/json" + "fmt" "io" "os" "path/filepath" @@ -36,6 +37,8 @@ type ActiveQueryTracker struct { maxConcurrent int } +var _ io.Closer = &ActiveQueryTracker{} + type Entry struct { Query string `json:"query"` Timestamp int64 `json:"timestamp_sec"` @@ -83,6 +86,23 @@ func logUnfinishedQueries(filename string, filesize int, logger log.Logger) { } } +type mmapedFile struct { + f io.Closer + m mmap.MMap +} + +func (f *mmapedFile) Close() error { + err := f.m.Unmap() + if fErr := f.f.Close(); fErr != nil && err == nil { + return fmt.Errorf("close mmapedFile.f: %w", fErr) + } + + if err != nil { + return fmt.Errorf("mmapedFile: unmapping: %w", err) + } + return nil +} + func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io.Closer, error) { file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666) if err != nil { @@ -108,7 +128,7 @@ func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io return nil, nil, err } - return fileAsBytes, file, err + return fileAsBytes, &mmapedFile{f: file, m: fileAsBytes}, err } func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker { @@ -204,9 +224,13 @@ func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int } } -func (tracker *ActiveQueryTracker) Close() { +// Close closes tracker. +func (tracker *ActiveQueryTracker) Close() error { if tracker == nil || tracker.closer == nil { - return + return nil } - tracker.closer.Close() + if err := tracker.closer.Close(); err != nil { + return fmt.Errorf("close ActiveQueryTracker.closer: %w", err) + } + return nil } diff --git a/promql/query_logger_test.go b/promql/query_logger_test.go index 376d61b641..7bd93781ec 100644 --- a/promql/query_logger_test.go +++ b/promql/query_logger_test.go @@ -16,6 +16,7 @@ package promql import ( "context" "os" + "path/filepath" "testing" "github.com/grafana/regexp" @@ -104,26 +105,26 @@ func TestIndexReuse(t *testing.T) { } func TestMMapFile(t *testing.T) { - file, err := os.CreateTemp("", "mmapedFile") + dir := t.TempDir() + fpath := filepath.Join(dir, "mmapedFile") + const data = "ab" + + fileAsBytes, closer, err := getMMapedFile(fpath, 2, nil) require.NoError(t, err) + copy(fileAsBytes, data) + require.NoError(t, closer.Close()) - filename := file.Name() - defer os.Remove(filename) - - fileAsBytes, _, err := getMMapedFile(filename, 2, nil) - - require.NoError(t, err) - copy(fileAsBytes, "ab") - - f, err := os.Open(filename) + f, err := os.Open(fpath) require.NoError(t, err) + t.Cleanup(func() { + _ = f.Close() + }) bytes := make([]byte, 4) n, err := f.Read(bytes) - require.Equal(t, 2, n) require.NoError(t, err, "Unexpected error while reading file.") - - require.Equal(t, fileAsBytes, bytes[:2], "Mmap failed") + require.Equal(t, 2, n) + require.Equal(t, []byte(data), bytes[:2], "Mmap failed") } func TestParseBrokenJSON(t *testing.T) { From 0f01d4b336cb4829475fbee6595041a25d85e594 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 15 May 2024 21:58:56 +0200 Subject: [PATCH 2/3] Fix flaky test Signed-off-by: Arve Knudsen --- promql/engine_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/promql/engine_test.go b/promql/engine_test.go index c47ceb2460..f431ab41e8 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -21,6 +21,7 @@ import ( "os" "sort" "strconv" + "sync" "testing" "time" @@ -94,9 +95,14 @@ func TestQueryConcurrency(t *testing.T) { return nil } + var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { q := engine.NewTestQuery(f) - go q.Exec(ctx) + wg.Add(1) + go func() { + q.Exec(ctx) + wg.Done() + }() select { case <-processing: // Expected. @@ -106,7 +112,11 @@ func TestQueryConcurrency(t *testing.T) { } q := engine.NewTestQuery(f) - go q.Exec(ctx) + wg.Add(1) + go func() { + q.Exec(ctx) + wg.Done() + }() select { case <-processing: @@ -129,6 +139,8 @@ func TestQueryConcurrency(t *testing.T) { for i := 0; i < maxConcurrency; i++ { block <- struct{}{} } + + wg.Wait() } // contextDone returns an error if the context was canceled or timed out. From f3b8750339d65fc25714c8f92f8afacefb6a727d Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 27 May 2024 17:14:17 +0200 Subject: [PATCH 3/3] Join errors Signed-off-by: Arve Knudsen --- promql/query_logger.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/promql/query_logger.go b/promql/query_logger.go index 76528f9584..7e06ebb97f 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -16,6 +16,7 @@ package promql import ( "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -93,14 +94,14 @@ type mmapedFile struct { func (f *mmapedFile) Close() error { err := f.m.Unmap() - if fErr := f.f.Close(); fErr != nil && err == nil { - return fmt.Errorf("close mmapedFile.f: %w", fErr) + if err != nil { + err = fmt.Errorf("mmapedFile: unmapping: %w", err) + } + if fErr := f.f.Close(); fErr != nil { + return errors.Join(fmt.Errorf("close mmapedFile.f: %w", fErr), err) } - if err != nil { - return fmt.Errorf("mmapedFile: unmapping: %w", err) - } - return nil + return err } func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io.Closer, error) {