From 9d9bc524e5e893287cfeeab9995252f54b0624a8 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Wed, 8 Jan 2020 14:28:43 +0100 Subject: [PATCH] Add query log (#6520) * Add query log, make stats logged in JSON like in the API Signed-off-by: Julien Pivotto --- cmd/prometheus/main.go | 12 +++ config/config.go | 5 +- config/config_test.go | 1 + docs/configuration/configuration.md | 4 + docs/stability.md | 1 + pkg/logging/dedupe.go | 1 + pkg/logging/dedupe_test.go | 1 + pkg/logging/file.go | 62 +++++++++++++ pkg/logging/file_test.go | 48 ++++++++++ pkg/logging/ratelimit.go | 1 + promql/engine.go | 101 +++++++++++++++++++-- promql/engine_test.go | 133 ++++++++++++++++++++++++++++ promql/query_logger.go | 2 +- web/api/v1/api.go | 19 ++++ web/api/v1/api_test.go | 5 +- 15 files changed, 386 insertions(+), 10 deletions(-) create mode 100644 pkg/logging/file.go create mode 100644 pkg/logging/file_test.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f4e6b84f4..f876eb4b7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -48,6 +48,7 @@ import ( "github.com/prometheus/prometheus/discovery" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/notifier" + "github.com/prometheus/prometheus/pkg/logging" "github.com/prometheus/prometheus/pkg/relabel" prom_runtime "github.com/prometheus/prometheus/pkg/runtime" "github.com/prometheus/prometheus/promql" @@ -420,6 +421,17 @@ func main() { reloaders := []func(cfg *config.Config) error{ remoteStorage.ApplyConfig, webHandler.ApplyConfig, + func(cfg *config.Config) error { + if cfg.GlobalConfig.QueryLogFile == "" { + return queryEngine.SetQueryLogger(nil) + } + + l, err := logging.NewJSONFileLogger(cfg.GlobalConfig.QueryLogFile) + if err != nil { + return err + } + return queryEngine.SetQueryLogger(l) + }, // The Scrape and notifier managers need to reload before the Discovery manager as // they need to read the most updated config when receiving the new targets list. scrapeManager.ApplyConfig, diff --git a/config/config.go b/config/config.go index 794ccc0ee..11667a826 100644 --- a/config/config.go +++ b/config/config.go @@ -299,6 +299,8 @@ type GlobalConfig struct { ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"` // How frequently to evaluate rules by default. EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"` + // File to which PromQL queries are logged. + QueryLogFile string `yaml:"query_log_file,omitempty"` // The labels to add to any timeseries that this Prometheus instance scrapes. ExternalLabels labels.Labels `yaml:"external_labels,omitempty"` } @@ -349,7 +351,8 @@ func (c *GlobalConfig) isZero() bool { return c.ExternalLabels == nil && c.ScrapeInterval == 0 && c.ScrapeTimeout == 0 && - c.EvaluationInterval == 0 + c.EvaluationInterval == 0 && + c.QueryLogFile == "" } // ScrapeConfig configures a scraping unit for Prometheus. diff --git a/config/config_test.go b/config/config_test.go index a15b527ac..4fbbded64 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -57,6 +57,7 @@ var expectedConf = &Config{ ScrapeInterval: model.Duration(15 * time.Second), ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, EvaluationInterval: model.Duration(30 * time.Second), + QueryLogFile: "", ExternalLabels: labels.Labels{ {Name: "foo", Value: "bar"}, diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 222594f31..413f6f26f 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -66,6 +66,10 @@ global: external_labels: [ : ... ] + # File to which PromQL queries are logged. + # Reloading the configuration will reopen the file. + [ query_log_file: ] + # Rule files specifies a list of globs. Rules and alerts are read from # all matching files. rule_files: diff --git a/docs/stability.md b/docs/stability.md index 1e10c0b5f..bf4c4323a 100644 --- a/docs/stability.md +++ b/docs/stability.md @@ -30,6 +30,7 @@ Things considered unstable for 2.x: * HTML generated by the web UI * The metrics in the /metrics endpoint of Prometheus itself * Exact on-disk format. Potential changes however, will be forward compatible and transparently handled by Prometheus +* The format of the logs As long as you are not using any features marked as experimental/unstable, an upgrade within a major version can usually be performed without any operational diff --git a/pkg/logging/dedupe.go b/pkg/logging/dedupe.go index f040b2f23..1d911ca2f 100644 --- a/pkg/logging/dedupe.go +++ b/pkg/logging/dedupe.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package logging import ( diff --git a/pkg/logging/dedupe_test.go b/pkg/logging/dedupe_test.go index bca7aeadd..da05ff423 100644 --- a/pkg/logging/dedupe_test.go +++ b/pkg/logging/dedupe_test.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package logging import ( diff --git a/pkg/logging/file.go b/pkg/logging/file.go new file mode 100644 index 000000000..be118fad0 --- /dev/null +++ b/pkg/logging/file.go @@ -0,0 +1,62 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logging + +import ( + "os" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" +) + +var ( + timestampFormat = log.TimestampFormat( + func() time.Time { return time.Now().UTC() }, + "2006-01-02T15:04:05.000Z07:00", + ) +) + +// JSONFileLogger represents a logger that writes JSON to a file. +type JSONFileLogger struct { + logger log.Logger + file *os.File +} + +// NewJSONFileLogger returns a new JSONFileLogger. +func NewJSONFileLogger(s string) (*JSONFileLogger, error) { + if s == "" { + return nil, nil + } + + f, err := os.OpenFile(s, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, errors.Wrap(err, "can't create json logger") + } + + return &JSONFileLogger{ + logger: log.With(log.NewJSONLogger(f), "ts", timestampFormat), + file: f, + }, nil +} + +// Close closes the underlying file. +func (l *JSONFileLogger) Close() error { + return l.file.Close() +} + +// Log calls the Log function of the underlying logger. +func (l *JSONFileLogger) Log(i ...interface{}) error { + return l.logger.Log(i...) +} diff --git a/pkg/logging/file_test.go b/pkg/logging/file_test.go new file mode 100644 index 000000000..dbfab346f --- /dev/null +++ b/pkg/logging/file_test.go @@ -0,0 +1,48 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logging + +import ( + "errors" + "io/ioutil" + "os" + "regexp" + "testing" + + "github.com/prometheus/prometheus/util/testutil" +) + +func TestJSONFileLogger_basic(t *testing.T) { + f, err := ioutil.TempFile("", "") + testutil.Ok(t, err) + defer f.Close() + + l, err := NewJSONFileLogger(f.Name()) + testutil.Ok(t, err) + testutil.Assert(t, l != nil, "logger can't be nil") + + l.Log("test", "yes") + r := make([]byte, 1024) + _, err = f.Read(r) + testutil.Ok(t, err) + result, err := regexp.Match(`^{"test":"yes","ts":"[^"]+"}\n`, r) + testutil.Ok(t, err) + testutil.Assert(t, result, "unexpected content: %s", r) + + err = l.Close() + testutil.Ok(t, err) + + err = l.file.Close() + testutil.Assert(t, errors.Is(err, os.ErrClosed), "file was not closed") +} diff --git a/pkg/logging/ratelimit.go b/pkg/logging/ratelimit.go index bf4d9dbda..d3567eaa0 100644 --- a/pkg/logging/ratelimit.go +++ b/pkg/logging/ratelimit.go @@ -10,6 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package logging import ( diff --git a/promql/engine.go b/promql/engine.go index 6afb3d4fb..bdecaa8d8 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -76,6 +76,8 @@ func GetDefaultEvaluationInterval() int64 { type engineMetrics struct { currentQueries prometheus.Gauge maxConcurrentQueries prometheus.Gauge + queryLogEnabled prometheus.Gauge + queryLogFailures prometheus.Counter queryQueueTime prometheus.Summary queryPrepareTime prometheus.Summary queryInnerEval prometheus.Summary @@ -112,6 +114,13 @@ func (e ErrStorage) Error() string { return e.Err.Error() } +// QueryLogger is an interface that can be used to log all the queries logged +// by the engine. +type QueryLogger interface { + Log(...interface{}) error + Close() error +} + // A Query is derived from an a raw query string and can be run against an engine // it is associated with. type Query interface { @@ -146,6 +155,10 @@ type query struct { ng *Engine } +type queryCtx int + +var queryOrigin queryCtx + // Statement implements the Query interface. func (q *query) Statement() Statement { return q.stmt @@ -177,18 +190,14 @@ func (q *query) Exec(ctx context.Context) *Result { } // Log query in active log. - var queryIndex int if q.ng.activeQueryTracker != nil { - queryIndex = q.ng.activeQueryTracker.Insert(q.q) + queryIndex := q.ng.activeQueryTracker.Insert(q.q) + defer q.ng.activeQueryTracker.Delete(queryIndex) } // Exec query. res, warnings, err := q.ng.exec(ctx, q) - // Delete query from active log. - if q.ng.activeQueryTracker != nil { - q.ng.activeQueryTracker.Delete(queryIndex) - } return &Result{Err: err, Value: res, Warnings: warnings} } @@ -230,6 +239,8 @@ type Engine struct { gate *gate.Gate maxSamplesPerQuery int activeQueryTracker *ActiveQueryTracker + queryLogger QueryLogger + queryLoggerLock sync.RWMutex } // NewEngine returns a new engine. @@ -245,6 +256,18 @@ func NewEngine(opts EngineOpts) *Engine { Name: "queries", Help: "The current number of queries being executed or waiting.", }), + queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_log_enabled", + Help: "State of the query log.", + }), + queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "query_log_failures_total", + Help: "The number of query log failures.", + }), maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -290,6 +313,8 @@ func NewEngine(opts EngineOpts) *Engine { opts.Reg.MustRegister( metrics.currentQueries, metrics.maxConcurrentQueries, + metrics.queryLogEnabled, + metrics.queryLogFailures, metrics.queryQueueTime, metrics.queryPrepareTime, metrics.queryInnerEval, @@ -307,6 +332,31 @@ func NewEngine(opts EngineOpts) *Engine { } } +// SetQueryLogger sets the query logger. +func (ng *Engine) SetQueryLogger(l QueryLogger) error { + ng.queryLoggerLock.Lock() + defer ng.queryLoggerLock.Unlock() + + if ng.queryLogger != nil { + // An error closing the old file descriptor should + // not make reload fail; only log a warning. + err := ng.queryLogger.Close() + if err != nil { + level.Warn(ng.logger).Log("msg", "error while closing the previous query log file", "err", err) + } + } + + ng.queryLogger = l + + if l != nil { + ng.metrics.queryLogEnabled.Set(1) + } else { + ng.metrics.queryLogEnabled.Set(0) + } + + return nil +} + // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) { expr, err := ParseExpr(qs) @@ -372,13 +422,41 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query { // // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. -func (ng *Engine) exec(ctx context.Context, q *query) (Value, storage.Warnings, error) { +func (ng *Engine) exec(ctx context.Context, q *query) (v Value, w storage.Warnings, err error) { ng.metrics.currentQueries.Inc() defer ng.metrics.currentQueries.Dec() ctx, cancel := context.WithTimeout(ctx, ng.timeout) q.cancel = cancel + defer func() { + ng.queryLoggerLock.RLock() + if l := ng.queryLogger; l != nil { + f := []interface{}{"query", q.q} + if err != nil { + f = append(f, "error", err) + } + if eq, ok := q.Statement().(*EvalStmt); ok { + f = append(f, + "start", formatDate(eq.Start), + "end", formatDate(eq.End), + "step", eq.Interval.String(), + ) + } + f = append(f, "stats", stats.NewQueryStats(q.Stats())) + if origin := ctx.Value(queryOrigin); origin != nil { + for k, v := range origin.(map[string]string) { + f = append(f, k, v) + } + } + if err := l.Log(f...); err != nil { + ng.metrics.queryLogFailures.Inc() + level.Error(ng.logger).Log("msg", "can't log query", "err", err) + } + } + ng.queryLoggerLock.RUnlock() + }() + execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) defer execSpanTimer.Finish() @@ -2000,6 +2078,11 @@ func shouldDropMetricName(op ItemType) bool { } } +// NewOriginContext returns a new context with data about the origin attached. +func NewOriginContext(ctx context.Context, data map[string]string) context.Context { + return context.WithValue(ctx, queryOrigin, data) +} + // documentedType returns the internal type to the equivalent // user facing terminology as defined in the documentation. func documentedType(t ValueType) string { @@ -2012,3 +2095,7 @@ func documentedType(t ValueType) string { return string(t) } } + +func formatDate(t time.Time) string { + return t.UTC().Format("2006-01-02T15:04:05.000Z07:00") +} diff --git a/promql/engine_test.go b/promql/engine_test.go index 15b43af9d..7aa86921e 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -1100,3 +1100,136 @@ func TestSubquerySelector(t *testing.T) { } } } + +type FakeQueryLogger struct { + closed bool + logs []interface{} +} + +func NewFakeQueryLogger() *FakeQueryLogger { + return &FakeQueryLogger{ + closed: false, + logs: make([]interface{}, 0), + } +} + +func (f *FakeQueryLogger) Close() error { + f.closed = true + return nil +} + +func (f *FakeQueryLogger) Log(l ...interface{}) error { + f.logs = append(f.logs, l...) + return nil +} + +func TestQueryLogger_basic(t *testing.T) { + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := NewEngine(opts) + + queryExec := func() { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + query := engine.newTestQuery(func(ctx context.Context) error { + return contextDone(ctx, "test statement execution") + }) + res := query.Exec(ctx) + testutil.Ok(t, res.Err) + } + + // Query works without query log initalized. + queryExec() + + f1 := NewFakeQueryLogger() + engine.SetQueryLogger(f1) + queryExec() + for i, field := range []string{"query", "test statement"} { + testutil.Assert(t, f1.logs[i].(string) == field, "expected %v as key, got %v", field, f1.logs[i]) + } + + l := len(f1.logs) + queryExec() + testutil.Assert(t, 2*l == len(f1.logs), "expected %d fields in logs, got %v", 2*l, len(f1.logs)) + + // Test that we close the query logger when unsetting it. + testutil.Assert(t, !f1.closed, "expected f1 to be open, got closed") + engine.SetQueryLogger(nil) + testutil.Assert(t, f1.closed, "expected f1 to be closed, got open") + queryExec() + + // Test that we close the query logger when swapping. + f2 := NewFakeQueryLogger() + f3 := NewFakeQueryLogger() + engine.SetQueryLogger(f2) + testutil.Assert(t, !f2.closed, "expected f2 to be open, got closed") + queryExec() + engine.SetQueryLogger(f3) + testutil.Assert(t, f2.closed, "expected f2 to be closed, got open") + testutil.Assert(t, !f3.closed, "expected f3 to be open, got closed") + queryExec() +} + +func TestQueryLogger_fields(t *testing.T) { + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := NewEngine(opts) + + f1 := NewFakeQueryLogger() + engine.SetQueryLogger(f1) + + ctx, cancelCtx := context.WithCancel(context.Background()) + ctx = NewOriginContext(ctx, map[string]string{"foo": "bar"}) + defer cancelCtx() + query := engine.newTestQuery(func(ctx context.Context) error { + return contextDone(ctx, "test statement execution") + }) + + res := query.Exec(ctx) + testutil.Ok(t, res.Err) + + expected := []string{"foo", "bar"} + for i, field := range expected { + v := f1.logs[len(f1.logs)-len(expected)+i].(string) + testutil.Assert(t, field == v, "expected %v as key, got %v", field, v) + } +} + +func TestQueryLogger_error(t *testing.T) { + opts := EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 10 * time.Second, + } + engine := NewEngine(opts) + + f1 := NewFakeQueryLogger() + engine.SetQueryLogger(f1) + + ctx, cancelCtx := context.WithCancel(context.Background()) + ctx = NewOriginContext(ctx, map[string]string{"foo": "bar"}) + defer cancelCtx() + testErr := errors.New("failure") + query := engine.newTestQuery(func(ctx context.Context) error { + return testErr + }) + + res := query.Exec(ctx) + testutil.NotOk(t, res.Err, "query should have failed") + + for i, field := range []interface{}{"query", "test statement", "error", testErr} { + testutil.Assert(t, f1.logs[i] == field, "expected %v as key, got %v", field, f1.logs[i]) + } +} diff --git a/promql/query_logger.go b/promql/query_logger.go index 1014ade40..948cfea96 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -80,7 +80,7 @@ func logUnfinishedQueries(filename string, filesize int, logger log.Logger) { func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, error) { - file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) + file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666) if err != nil { level.Error(logger).Log("msg", "Error opening query log file", "file", filename, "err", err) return nil, err diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 4909f9831..871296e89 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -18,6 +18,7 @@ import ( "fmt" "math" "math/rand" + "net" "net/http" "net/url" "os" @@ -341,6 +342,11 @@ func (api *API) query(r *http.Request) apiFuncResult { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } + ctx, err = contextFromRequest(ctx, r) + if err != nil { + return apiFuncResult{nil, returnAPIError(err), nil, nil} + } + res := qry.Exec(ctx) if res.Err != nil { return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} @@ -411,6 +417,11 @@ func (api *API) queryRange(r *http.Request) apiFuncResult { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } + ctx, err = contextFromRequest(ctx, r) + if err != nil { + return apiFuncResult{nil, returnAPIError(err), nil, nil} + } + res := qry.Exec(ctx) if res.Err != nil { return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} @@ -1469,3 +1480,11 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) { func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool { return false } + +func contextFromRequest(ctx context.Context, r *http.Request) (context.Context, error) { + ip, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return ctx, err + } + return promql.NewOriginContext(ctx, map[string]string{"clientIP": ip}), nil +} diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index f5f485259..063c6721b 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1466,9 +1466,12 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI if m == http.MethodPost { r, err := http.NewRequest(m, "http://example.com", strings.NewReader(q.Encode())) r.Header.Set("Content-Type", "application/x-www-form-urlencoded") + r.RemoteAddr = "127.0.0.1:20201" return r, err } - return http.NewRequest(m, fmt.Sprintf("http://example.com?%s", q.Encode()), nil) + r, err := http.NewRequest(m, fmt.Sprintf("http://example.com?%s", q.Encode()), nil) + r.RemoteAddr = "127.0.0.1:20201" + return r, err } for i, test := range tests {