Merge pull request #15472 from tjhop/ref/jsonfilelogger-slog-handler

ref: JSONFileLogger slog handler, add scrape.FailureLogger interface
This commit is contained in:
Jan Fajerski 2025-01-27 20:17:46 +01:00 committed by GitHub
commit 54cf0d6879
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 176 additions and 118 deletions

View file

@ -47,6 +47,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/logging"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/zeropool"
)
@ -123,12 +124,13 @@ type QueryEngine interface {
NewRangeQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error)
}
var _ QueryLogger = (*logging.JSONFileLogger)(nil)
// QueryLogger is an interface that can be used to log all the queries logged
// by the engine.
type QueryLogger interface {
Log(context.Context, slog.Level, string, ...any)
With(args ...any)
Close() error
slog.Handler
io.Closer
}
// A Query is derived from an a raw query string and can be run against an engine
@ -628,6 +630,9 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws annota
defer func() {
ng.queryLoggerLock.RLock()
if l := ng.queryLogger; l != nil {
logger := slog.New(l)
f := make([]slog.Attr, 0, 16) // Probably enough up front to not need to reallocate on append.
params := make(map[string]interface{}, 4)
params["query"] = q.q
if eq, ok := q.Statement().(*parser.EvalStmt); ok {
@ -636,20 +641,20 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws annota
// The step provided by the user is in seconds.
params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond))
}
f := []interface{}{"params", params}
f = append(f, slog.Any("params", params))
if err != nil {
f = append(f, "error", err)
f = append(f, slog.Any("error", err))
}
f = append(f, "stats", stats.NewQueryStats(q.Stats()))
f = append(f, slog.Any("stats", stats.NewQueryStats(q.Stats())))
if span := trace.SpanFromContext(ctx); span != nil {
f = append(f, "spanID", span.SpanContext().SpanID())
f = append(f, slog.Any("spanID", span.SpanContext().SpanID()))
}
if origin := ctx.Value(QueryOrigin{}); origin != nil {
for k, v := range origin.(map[string]interface{}) {
f = append(f, k, v)
f = append(f, slog.Any(k, v))
}
}
l.Log(context.Background(), slog.LevelInfo, "promql query logged", f...)
logger.LogAttrs(context.Background(), slog.LevelInfo, "promql query logged", f...)
// TODO: @tjhop -- do we still need this metric/error log if logger doesn't return errors?
// ng.metrics.queryLogFailures.Inc()
// ng.logger.Error("can't log query", "err", err)

View file

@ -17,8 +17,9 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"os"
"path/filepath"
"sort"
"strings"
"sync"
@ -38,6 +39,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/logging"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/testutil"
)
@ -2147,40 +2149,17 @@ func TestSubquerySelector(t *testing.T) {
}
}
type FakeQueryLogger struct {
closed bool
logs []interface{}
attrs []any
}
func getLogLines(t *testing.T, name string) []string {
content, err := os.ReadFile(name)
require.NoError(t, err)
func NewFakeQueryLogger() *FakeQueryLogger {
return &FakeQueryLogger{
closed: false,
logs: make([]interface{}, 0),
attrs: make([]any, 0),
lines := strings.Split(string(content), "\n")
for i := len(lines) - 1; i >= 0; i-- {
if lines[i] == "" {
lines = append(lines[:i], lines[i+1:]...)
}
}
}
// It implements the promql.QueryLogger interface.
func (f *FakeQueryLogger) Close() error {
f.closed = true
return nil
}
// It implements the promql.QueryLogger interface.
func (f *FakeQueryLogger) Log(ctx context.Context, level slog.Level, msg string, args ...any) {
// Test usage only really cares about existence of keyvals passed in
// via args, just append in the log message before handling the
// provided args and any embedded kvs added via `.With()` on f.attrs.
log := append([]any{msg}, args...)
log = append(log, f.attrs...)
f.attrs = f.attrs[:0]
f.logs = append(f.logs, log...)
}
// It implements the promql.QueryLogger interface.
func (f *FakeQueryLogger) With(args ...any) {
f.attrs = append(f.attrs, args...)
return lines
}
func TestQueryLogger_basic(t *testing.T) {
@ -2205,32 +2184,45 @@ func TestQueryLogger_basic(t *testing.T) {
// promql.Query works without query log initialized.
queryExec()
f1 := NewFakeQueryLogger()
tmpDir := t.TempDir()
ql1File := filepath.Join(tmpDir, "query1.log")
f1, err := logging.NewJSONFileLogger(ql1File)
require.NoError(t, err)
engine.SetQueryLogger(f1)
queryExec()
require.Contains(t, f1.logs, `params`)
require.Contains(t, f1.logs, map[string]interface{}{"query": "test statement"})
logLines := getLogLines(t, ql1File)
require.Contains(t, logLines[0], "params", map[string]interface{}{"query": "test statement"})
require.Len(t, logLines, 1)
l := len(f1.logs)
l := len(logLines)
queryExec()
require.Len(t, f1.logs, 2*l)
logLines = getLogLines(t, ql1File)
l2 := len(logLines)
require.Equal(t, l2, 2*l)
// Test that we close the query logger when unsetting it.
require.False(t, f1.closed, "expected f1 to be open, got closed")
// Test that we close the query logger when unsetting it. The following
// attempt to close the file should error.
engine.SetQueryLogger(nil)
require.True(t, f1.closed, "expected f1 to be closed, got open")
err = f1.Close()
require.ErrorContains(t, err, "file already closed", "expected f1 to be closed, got open")
queryExec()
// Test that we close the query logger when swapping.
f2 := NewFakeQueryLogger()
f3 := NewFakeQueryLogger()
ql2File := filepath.Join(tmpDir, "query2.log")
f2, err := logging.NewJSONFileLogger(ql2File)
require.NoError(t, err)
ql3File := filepath.Join(tmpDir, "query3.log")
f3, err := logging.NewJSONFileLogger(ql3File)
require.NoError(t, err)
engine.SetQueryLogger(f2)
require.False(t, f2.closed, "expected f2 to be open, got closed")
queryExec()
engine.SetQueryLogger(f3)
require.True(t, f2.closed, "expected f2 to be closed, got open")
require.False(t, f3.closed, "expected f3 to be open, got closed")
err = f2.Close()
require.ErrorContains(t, err, "file already closed", "expected f2 to be closed, got open")
queryExec()
err = f3.Close()
require.NoError(t, err)
}
func TestQueryLogger_fields(t *testing.T) {
@ -2242,7 +2234,14 @@ func TestQueryLogger_fields(t *testing.T) {
}
engine := promqltest.NewTestEngineWithOpts(t, opts)
f1 := NewFakeQueryLogger()
tmpDir := t.TempDir()
ql1File := filepath.Join(tmpDir, "query1.log")
f1, err := logging.NewJSONFileLogger(ql1File)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, f1.Close())
})
engine.SetQueryLogger(f1)
ctx, cancelCtx := context.WithCancel(context.Background())
@ -2255,8 +2254,8 @@ func TestQueryLogger_fields(t *testing.T) {
res := query.Exec(ctx)
require.NoError(t, res.Err)
require.Contains(t, f1.logs, `foo`)
require.Contains(t, f1.logs, `bar`)
logLines := getLogLines(t, ql1File)
require.Contains(t, logLines[0], "foo", "bar")
}
func TestQueryLogger_error(t *testing.T) {
@ -2268,7 +2267,14 @@ func TestQueryLogger_error(t *testing.T) {
}
engine := promqltest.NewTestEngineWithOpts(t, opts)
f1 := NewFakeQueryLogger()
tmpDir := t.TempDir()
ql1File := filepath.Join(tmpDir, "query1.log")
f1, err := logging.NewJSONFileLogger(ql1File)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, f1.Close())
})
engine.SetQueryLogger(f1)
ctx, cancelCtx := context.WithCancel(context.Background())
@ -2282,10 +2288,9 @@ func TestQueryLogger_error(t *testing.T) {
res := query.Exec(ctx)
require.Error(t, res.Err, "query should have failed")
require.Contains(t, f1.logs, `params`)
require.Contains(t, f1.logs, map[string]interface{}{"query": "test statement"})
require.Contains(t, f1.logs, `error`)
require.Contains(t, f1.logs, testErr)
logLines := getLogLines(t, ql1File)
require.Contains(t, logLines[0], "error", testErr)
require.Contains(t, logLines[0], "params", map[string]interface{}{"query": "test statement"})
}
func TestPreprocessAndWrapWithStepInvariantExpr(t *testing.T) {

View file

@ -107,7 +107,7 @@ type Manager struct {
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error)
scrapeFailureLoggers map[string]*logging.JSONFileLogger
scrapeFailureLoggers map[string]FailureLogger
targetSets map[string][]*targetgroup.Group
buffers *pool.Pool
@ -249,7 +249,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
}
c := make(map[string]*config.ScrapeConfig)
scrapeFailureLoggers := map[string]*logging.JSONFileLogger{
scrapeFailureLoggers := map[string]FailureLogger{
"": nil, // Emptying the file name sets the scrape logger to nil.
}
for _, scfg := range scfgs {
@ -257,7 +257,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
if _, ok := scrapeFailureLoggers[scfg.ScrapeFailureLogFile]; !ok {
// We promise to reopen the file on each reload.
var (
logger *logging.JSONFileLogger
logger FailureLogger
err error
)
if m.newScrapeFailureLogger != nil {

View file

@ -62,6 +62,15 @@ var AlignScrapeTimestamps = true
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName)
var _ FailureLogger = (*logging.JSONFileLogger)(nil)
// FailureLogger is an interface that can be used to log all failed
// scrapes.
type FailureLogger interface {
slog.Handler
io.Closer
}
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable storage.Appendable
@ -91,7 +100,7 @@ type scrapePool struct {
metrics *scrapeMetrics
scrapeFailureLogger *logging.JSONFileLogger
scrapeFailureLogger FailureLogger
scrapeFailureLoggerMtx sync.RWMutex
}
@ -224,11 +233,11 @@ func (sp *scrapePool) DroppedTargetsCount() int {
return sp.droppedTargetsCount
}
func (sp *scrapePool) SetScrapeFailureLogger(l *logging.JSONFileLogger) {
func (sp *scrapePool) SetScrapeFailureLogger(l FailureLogger) {
sp.scrapeFailureLoggerMtx.Lock()
defer sp.scrapeFailureLoggerMtx.Unlock()
if l != nil {
l.With("job_name", sp.config.JobName)
l = slog.New(l).With("job_name", sp.config.JobName).Handler().(FailureLogger)
}
sp.scrapeFailureLogger = l
@ -239,7 +248,7 @@ func (sp *scrapePool) SetScrapeFailureLogger(l *logging.JSONFileLogger) {
}
}
func (sp *scrapePool) getScrapeFailureLogger() *logging.JSONFileLogger {
func (sp *scrapePool) getScrapeFailureLogger() FailureLogger {
sp.scrapeFailureLoggerMtx.RLock()
defer sp.scrapeFailureLoggerMtx.RUnlock()
return sp.scrapeFailureLogger
@ -866,7 +875,7 @@ func (s *targetScraper) readResponse(ctx context.Context, resp *http.Response, w
type loop interface {
run(errc chan<- error)
setForcedError(err error)
setScrapeFailureLogger(*logging.JSONFileLogger)
setScrapeFailureLogger(FailureLogger)
stop()
getCache() *scrapeCache
disableEndOfRunStalenessMarkers()
@ -882,7 +891,7 @@ type cacheEntry struct {
type scrapeLoop struct {
scraper scraper
l *slog.Logger
scrapeFailureLogger *logging.JSONFileLogger
scrapeFailureLogger FailureLogger
scrapeFailureLoggerMtx sync.RWMutex
cache *scrapeCache
lastScrapeSize int
@ -1282,11 +1291,11 @@ func newScrapeLoop(ctx context.Context,
return sl
}
func (sl *scrapeLoop) setScrapeFailureLogger(l *logging.JSONFileLogger) {
func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
sl.scrapeFailureLoggerMtx.Lock()
defer sl.scrapeFailureLoggerMtx.Unlock()
if ts, ok := sl.scraper.(fmt.Stringer); ok && l != nil {
l.With("target", ts.String())
l = slog.New(l).With("target", ts.String()).Handler().(FailureLogger)
}
sl.scrapeFailureLogger = l
}
@ -1436,7 +1445,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
sl.l.Debug("Scrape failed", "err", scrapeErr)
sl.scrapeFailureLoggerMtx.RLock()
if sl.scrapeFailureLogger != nil {
sl.scrapeFailureLogger.Log(context.Background(), slog.LevelError, scrapeErr.Error())
slog.New(sl.scrapeFailureLogger).Error(scrapeErr.Error())
}
sl.scrapeFailureLoggerMtx.RUnlock()
if errc != nil {

View file

@ -57,7 +57,6 @@ import (
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/logging"
"github.com/prometheus/prometheus/util/pool"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
@ -395,7 +394,7 @@ type testLoop struct {
timeout time.Duration
}
func (l *testLoop) setScrapeFailureLogger(*logging.JSONFileLogger) {
func (l *testLoop) setScrapeFailureLogger(FailureLogger) {
}
func (l *testLoop) run(errc chan<- error) {

View file

@ -16,16 +16,22 @@ package logging
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"github.com/prometheus/common/promslog"
)
// JSONFileLogger represents a logger that writes JSON to a file. It implements the promql.QueryLogger interface.
var _ slog.Handler = (*JSONFileLogger)(nil)
var _ io.Closer = (*JSONFileLogger)(nil)
// JSONFileLogger represents a logger that writes JSON to a file. It implements
// the slog.Handler interface, as well as the io.Closer interface.
type JSONFileLogger struct {
logger *slog.Logger
file *os.File
handler slog.Handler
file *os.File
}
// NewJSONFileLogger returns a new JSONFileLogger.
@ -42,24 +48,47 @@ func NewJSONFileLogger(s string) (*JSONFileLogger, error) {
jsonFmt := &promslog.AllowedFormat{}
_ = jsonFmt.Set("json")
return &JSONFileLogger{
logger: promslog.New(&promslog.Config{Format: jsonFmt, Writer: f}),
file: f,
handler: promslog.New(&promslog.Config{Format: jsonFmt, Writer: f}).Handler(),
file: f,
}, nil
}
// Close closes the underlying file. It implements the promql.QueryLogger interface.
// Close closes the underlying file. It implements the io.Closer interface.
func (l *JSONFileLogger) Close() error {
return l.file.Close()
}
// With calls the `With()` method on the underlying `log/slog.Logger` with the
// provided msg and args. It implements the promql.QueryLogger interface.
func (l *JSONFileLogger) With(args ...any) {
l.logger = l.logger.With(args...)
// Enabled returns true if and only if the internal slog.Handler is enabled. It
// implements the slog.Handler interface.
func (l *JSONFileLogger) Enabled(ctx context.Context, level slog.Level) bool {
return l.handler.Enabled(ctx, level)
}
// Log calls the `Log()` method on the underlying `log/slog.Logger` with the
// provided msg and args. It implements the promql.QueryLogger interface.
func (l *JSONFileLogger) Log(ctx context.Context, level slog.Level, msg string, args ...any) {
l.logger.Log(ctx, level, msg, args...)
// Handle takes record created by an slog.Logger and forwards it to the
// internal slog.Handler for dispatching the log call to the backing file. It
// implements the slog.Handler interface.
func (l *JSONFileLogger) Handle(ctx context.Context, r slog.Record) error {
return l.handler.Handle(ctx, r.Clone())
}
// WithAttrs returns a new *JSONFileLogger with a new internal handler that has
// the provided attrs attached as attributes on all further log calls. It
// implements the slog.Handler interface.
func (l *JSONFileLogger) WithAttrs(attrs []slog.Attr) slog.Handler {
if len(attrs) == 0 {
return l
}
return &JSONFileLogger{file: l.file, handler: l.handler.WithAttrs(attrs)}
}
// WithGroup returns a new *JSONFileLogger with a new internal handler that has
// the provided group name attached, to group all other attributes added to the
// logger. It implements the slog.Handler interface.
func (l *JSONFileLogger) WithGroup(name string) slog.Handler {
if name == "" {
return l
}
return &JSONFileLogger{file: l.file, handler: l.handler.WithGroup(name)}
}

View file

@ -14,7 +14,6 @@
package logging
import (
"context"
"log/slog"
"os"
"strings"
@ -24,6 +23,19 @@ import (
"github.com/stretchr/testify/require"
)
func getLogLines(t *testing.T, name string) []string {
content, err := os.ReadFile(name)
require.NoError(t, err)
lines := strings.Split(string(content), "\n")
for i := len(lines) - 1; i >= 0; i-- {
if lines[i] == "" {
lines = append(lines[:i], lines[i+1:]...)
}
}
return lines
}
func TestJSONFileLogger_basic(t *testing.T) {
f, err := os.CreateTemp("", "logging")
require.NoError(t, err)
@ -32,24 +44,23 @@ func TestJSONFileLogger_basic(t *testing.T) {
require.NoError(t, os.Remove(f.Name()))
}()
l, err := NewJSONFileLogger(f.Name())
logHandler, err := NewJSONFileLogger(f.Name())
require.NoError(t, err)
require.NotNil(t, l, "logger can't be nil")
require.NotNil(t, logHandler, "logger handler can't be nil")
l.Log(context.Background(), slog.LevelInfo, "test", "hello", "world")
require.NoError(t, err)
r := make([]byte, 1024)
_, err = f.Read(r)
require.NoError(t, err)
logger := slog.New(logHandler)
logger.Info("test", "hello", "world")
result, err := regexp.Match(`^{"time":"[^"]+","level":"INFO","source":"file.go:\d+","msg":"test","hello":"world"}\n`, r)
r := getLogLines(t, f.Name())
require.Len(t, r, 1, "expected 1 log line")
result, err := regexp.Match(`^{"time":"[^"]+","level":"INFO","source":"\w+.go:\d+","msg":"test","hello":"world"}`, []byte(r[0]))
require.NoError(t, err)
require.True(t, result, "unexpected content: %s", r)
err = l.Close()
err = logHandler.Close()
require.NoError(t, err)
err = l.file.Close()
err = logHandler.file.Close()
require.Error(t, err)
require.True(t, strings.HasSuffix(err.Error(), os.ErrClosed.Error()), "file not closed")
}
@ -62,31 +73,31 @@ func TestJSONFileLogger_parallel(t *testing.T) {
require.NoError(t, os.Remove(f.Name()))
}()
l, err := NewJSONFileLogger(f.Name())
logHandler, err := NewJSONFileLogger(f.Name())
require.NoError(t, err)
require.NotNil(t, l, "logger can't be nil")
require.NotNil(t, logHandler, "logger handler can't be nil")
l.Log(context.Background(), slog.LevelInfo, "test", "hello", "world")
logger := slog.New(logHandler)
logger.Info("test", "hello", "world")
logHandler2, err := NewJSONFileLogger(f.Name())
require.NoError(t, err)
require.NotNil(t, logHandler2, "logger handler can't be nil")
logger2 := slog.New(logHandler2)
logger2.Info("test", "hello", "world")
err = logHandler.Close()
require.NoError(t, err)
l2, err := NewJSONFileLogger(f.Name())
require.NoError(t, err)
require.NotNil(t, l, "logger can't be nil")
l2.Log(context.Background(), slog.LevelInfo, "test", "hello", "world")
require.NoError(t, err)
err = l.Close()
require.NoError(t, err)
err = l.file.Close()
err = logHandler.file.Close()
require.Error(t, err)
require.True(t, strings.HasSuffix(err.Error(), os.ErrClosed.Error()), "file not closed")
err = l2.Close()
err = logHandler2.Close()
require.NoError(t, err)
err = l2.file.Close()
err = logHandler2.file.Close()
require.Error(t, err)
require.True(t, strings.HasSuffix(err.Error(), os.ErrClosed.Error()), "file not closed")
}