Add query log (#6520)

* Add query log, make stats logged in JSON like in the API

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-01-08 14:28:43 +01:00 committed by Brian Brazil
parent 3d6cf1c289
commit 9d9bc524e5
15 changed files with 386 additions and 10 deletions

View file

@ -48,6 +48,7 @@ import (
"github.com/prometheus/prometheus/discovery"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/logging"
"github.com/prometheus/prometheus/pkg/relabel"
prom_runtime "github.com/prometheus/prometheus/pkg/runtime"
"github.com/prometheus/prometheus/promql"
@ -420,6 +421,17 @@ func main() {
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
webHandler.ApplyConfig,
func(cfg *config.Config) error {
if cfg.GlobalConfig.QueryLogFile == "" {
return queryEngine.SetQueryLogger(nil)
}
l, err := logging.NewJSONFileLogger(cfg.GlobalConfig.QueryLogFile)
if err != nil {
return err
}
return queryEngine.SetQueryLogger(l)
},
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
scrapeManager.ApplyConfig,

View file

@ -299,6 +299,8 @@ type GlobalConfig struct {
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// How frequently to evaluate rules by default.
EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"`
// File to which PromQL queries are logged.
QueryLogFile string `yaml:"query_log_file,omitempty"`
// The labels to add to any timeseries that this Prometheus instance scrapes.
ExternalLabels labels.Labels `yaml:"external_labels,omitempty"`
}
@ -349,7 +351,8 @@ func (c *GlobalConfig) isZero() bool {
return c.ExternalLabels == nil &&
c.ScrapeInterval == 0 &&
c.ScrapeTimeout == 0 &&
c.EvaluationInterval == 0
c.EvaluationInterval == 0 &&
c.QueryLogFile == ""
}
// ScrapeConfig configures a scraping unit for Prometheus.

View file

@ -57,6 +57,7 @@ var expectedConf = &Config{
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
EvaluationInterval: model.Duration(30 * time.Second),
QueryLogFile: "",
ExternalLabels: labels.Labels{
{Name: "foo", Value: "bar"},

View file

@ -66,6 +66,10 @@ global:
external_labels:
[ <labelname>: <labelvalue> ... ]
# File to which PromQL queries are logged.
# Reloading the configuration will reopen the file.
[ query_log_file: <string> ]
# Rule files specifies a list of globs. Rules and alerts are read from
# all matching files.
rule_files:

View file

@ -30,6 +30,7 @@ Things considered unstable for 2.x:
* HTML generated by the web UI
* The metrics in the /metrics endpoint of Prometheus itself
* Exact on-disk format. Potential changes however, will be forward compatible and transparently handled by Prometheus
* The format of the logs
As long as you are not using any features marked as experimental/unstable, an
upgrade within a major version can usually be performed without any operational

View file

@ -10,6 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (

View file

@ -10,6 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (

62
pkg/logging/file.go Normal file
View file

@ -0,0 +1,62 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (
"os"
"time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
)
var (
timestampFormat = log.TimestampFormat(
func() time.Time { return time.Now().UTC() },
"2006-01-02T15:04:05.000Z07:00",
)
)
// JSONFileLogger represents a logger that writes JSON to a file.
type JSONFileLogger struct {
logger log.Logger
file *os.File
}
// NewJSONFileLogger returns a new JSONFileLogger.
func NewJSONFileLogger(s string) (*JSONFileLogger, error) {
if s == "" {
return nil, nil
}
f, err := os.OpenFile(s, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, errors.Wrap(err, "can't create json logger")
}
return &JSONFileLogger{
logger: log.With(log.NewJSONLogger(f), "ts", timestampFormat),
file: f,
}, nil
}
// Close closes the underlying file.
func (l *JSONFileLogger) Close() error {
return l.file.Close()
}
// Log calls the Log function of the underlying logger.
func (l *JSONFileLogger) Log(i ...interface{}) error {
return l.logger.Log(i...)
}

48
pkg/logging/file_test.go Normal file
View file

@ -0,0 +1,48 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (
"errors"
"io/ioutil"
"os"
"regexp"
"testing"
"github.com/prometheus/prometheus/util/testutil"
)
func TestJSONFileLogger_basic(t *testing.T) {
f, err := ioutil.TempFile("", "")
testutil.Ok(t, err)
defer f.Close()
l, err := NewJSONFileLogger(f.Name())
testutil.Ok(t, err)
testutil.Assert(t, l != nil, "logger can't be nil")
l.Log("test", "yes")
r := make([]byte, 1024)
_, err = f.Read(r)
testutil.Ok(t, err)
result, err := regexp.Match(`^{"test":"yes","ts":"[^"]+"}\n`, r)
testutil.Ok(t, err)
testutil.Assert(t, result, "unexpected content: %s", r)
err = l.Close()
testutil.Ok(t, err)
err = l.file.Close()
testutil.Assert(t, errors.Is(err, os.ErrClosed), "file was not closed")
}

View file

@ -10,6 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logging
import (

View file

@ -76,6 +76,8 @@ func GetDefaultEvaluationInterval() int64 {
type engineMetrics struct {
currentQueries prometheus.Gauge
maxConcurrentQueries prometheus.Gauge
queryLogEnabled prometheus.Gauge
queryLogFailures prometheus.Counter
queryQueueTime prometheus.Summary
queryPrepareTime prometheus.Summary
queryInnerEval prometheus.Summary
@ -112,6 +114,13 @@ func (e ErrStorage) Error() string {
return e.Err.Error()
}
// QueryLogger is an interface that can be used to log all the queries logged
// by the engine.
type QueryLogger interface {
Log(...interface{}) error
Close() error
}
// A Query is derived from an a raw query string and can be run against an engine
// it is associated with.
type Query interface {
@ -146,6 +155,10 @@ type query struct {
ng *Engine
}
type queryCtx int
var queryOrigin queryCtx
// Statement implements the Query interface.
func (q *query) Statement() Statement {
return q.stmt
@ -177,18 +190,14 @@ func (q *query) Exec(ctx context.Context) *Result {
}
// Log query in active log.
var queryIndex int
if q.ng.activeQueryTracker != nil {
queryIndex = q.ng.activeQueryTracker.Insert(q.q)
queryIndex := q.ng.activeQueryTracker.Insert(q.q)
defer q.ng.activeQueryTracker.Delete(queryIndex)
}
// Exec query.
res, warnings, err := q.ng.exec(ctx, q)
// Delete query from active log.
if q.ng.activeQueryTracker != nil {
q.ng.activeQueryTracker.Delete(queryIndex)
}
return &Result{Err: err, Value: res, Warnings: warnings}
}
@ -230,6 +239,8 @@ type Engine struct {
gate *gate.Gate
maxSamplesPerQuery int
activeQueryTracker *ActiveQueryTracker
queryLogger QueryLogger
queryLoggerLock sync.RWMutex
}
// NewEngine returns a new engine.
@ -245,6 +256,18 @@ func NewEngine(opts EngineOpts) *Engine {
Name: "queries",
Help: "The current number of queries being executed or waiting.",
}),
queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "query_log_enabled",
Help: "State of the query log.",
}),
queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "query_log_failures_total",
Help: "The number of query log failures.",
}),
maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -290,6 +313,8 @@ func NewEngine(opts EngineOpts) *Engine {
opts.Reg.MustRegister(
metrics.currentQueries,
metrics.maxConcurrentQueries,
metrics.queryLogEnabled,
metrics.queryLogFailures,
metrics.queryQueueTime,
metrics.queryPrepareTime,
metrics.queryInnerEval,
@ -307,6 +332,31 @@ func NewEngine(opts EngineOpts) *Engine {
}
}
// SetQueryLogger sets the query logger.
func (ng *Engine) SetQueryLogger(l QueryLogger) error {
ng.queryLoggerLock.Lock()
defer ng.queryLoggerLock.Unlock()
if ng.queryLogger != nil {
// An error closing the old file descriptor should
// not make reload fail; only log a warning.
err := ng.queryLogger.Close()
if err != nil {
level.Warn(ng.logger).Log("msg", "error while closing the previous query log file", "err", err)
}
}
ng.queryLogger = l
if l != nil {
ng.metrics.queryLogEnabled.Set(1)
} else {
ng.metrics.queryLogEnabled.Set(0)
}
return nil
}
// NewInstantQuery returns an evaluation query for the given expression at the given time.
func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) {
expr, err := ParseExpr(qs)
@ -372,13 +422,41 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (Value, storage.Warnings, error) {
func (ng *Engine) exec(ctx context.Context, q *query) (v Value, w storage.Warnings, err error) {
ng.metrics.currentQueries.Inc()
defer ng.metrics.currentQueries.Dec()
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
q.cancel = cancel
defer func() {
ng.queryLoggerLock.RLock()
if l := ng.queryLogger; l != nil {
f := []interface{}{"query", q.q}
if err != nil {
f = append(f, "error", err)
}
if eq, ok := q.Statement().(*EvalStmt); ok {
f = append(f,
"start", formatDate(eq.Start),
"end", formatDate(eq.End),
"step", eq.Interval.String(),
)
}
f = append(f, "stats", stats.NewQueryStats(q.Stats()))
if origin := ctx.Value(queryOrigin); origin != nil {
for k, v := range origin.(map[string]string) {
f = append(f, k, v)
}
}
if err := l.Log(f...); err != nil {
ng.metrics.queryLogFailures.Inc()
level.Error(ng.logger).Log("msg", "can't log query", "err", err)
}
}
ng.queryLoggerLock.RUnlock()
}()
execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
defer execSpanTimer.Finish()
@ -2000,6 +2078,11 @@ func shouldDropMetricName(op ItemType) bool {
}
}
// NewOriginContext returns a new context with data about the origin attached.
func NewOriginContext(ctx context.Context, data map[string]string) context.Context {
return context.WithValue(ctx, queryOrigin, data)
}
// documentedType returns the internal type to the equivalent
// user facing terminology as defined in the documentation.
func documentedType(t ValueType) string {
@ -2012,3 +2095,7 @@ func documentedType(t ValueType) string {
return string(t)
}
}
func formatDate(t time.Time) string {
return t.UTC().Format("2006-01-02T15:04:05.000Z07:00")
}

View file

@ -1100,3 +1100,136 @@ func TestSubquerySelector(t *testing.T) {
}
}
}
type FakeQueryLogger struct {
closed bool
logs []interface{}
}
func NewFakeQueryLogger() *FakeQueryLogger {
return &FakeQueryLogger{
closed: false,
logs: make([]interface{}, 0),
}
}
func (f *FakeQueryLogger) Close() error {
f.closed = true
return nil
}
func (f *FakeQueryLogger) Log(l ...interface{}) error {
f.logs = append(f.logs, l...)
return nil
}
func TestQueryLogger_basic(t *testing.T) {
opts := EngineOpts{
Logger: nil,
Reg: nil,
MaxConcurrent: 10,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
queryExec := func() {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
query := engine.newTestQuery(func(ctx context.Context) error {
return contextDone(ctx, "test statement execution")
})
res := query.Exec(ctx)
testutil.Ok(t, res.Err)
}
// Query works without query log initalized.
queryExec()
f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
queryExec()
for i, field := range []string{"query", "test statement"} {
testutil.Assert(t, f1.logs[i].(string) == field, "expected %v as key, got %v", field, f1.logs[i])
}
l := len(f1.logs)
queryExec()
testutil.Assert(t, 2*l == len(f1.logs), "expected %d fields in logs, got %v", 2*l, len(f1.logs))
// Test that we close the query logger when unsetting it.
testutil.Assert(t, !f1.closed, "expected f1 to be open, got closed")
engine.SetQueryLogger(nil)
testutil.Assert(t, f1.closed, "expected f1 to be closed, got open")
queryExec()
// Test that we close the query logger when swapping.
f2 := NewFakeQueryLogger()
f3 := NewFakeQueryLogger()
engine.SetQueryLogger(f2)
testutil.Assert(t, !f2.closed, "expected f2 to be open, got closed")
queryExec()
engine.SetQueryLogger(f3)
testutil.Assert(t, f2.closed, "expected f2 to be closed, got open")
testutil.Assert(t, !f3.closed, "expected f3 to be open, got closed")
queryExec()
}
func TestQueryLogger_fields(t *testing.T) {
opts := EngineOpts{
Logger: nil,
Reg: nil,
MaxConcurrent: 10,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
ctx, cancelCtx := context.WithCancel(context.Background())
ctx = NewOriginContext(ctx, map[string]string{"foo": "bar"})
defer cancelCtx()
query := engine.newTestQuery(func(ctx context.Context) error {
return contextDone(ctx, "test statement execution")
})
res := query.Exec(ctx)
testutil.Ok(t, res.Err)
expected := []string{"foo", "bar"}
for i, field := range expected {
v := f1.logs[len(f1.logs)-len(expected)+i].(string)
testutil.Assert(t, field == v, "expected %v as key, got %v", field, v)
}
}
func TestQueryLogger_error(t *testing.T) {
opts := EngineOpts{
Logger: nil,
Reg: nil,
MaxConcurrent: 10,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := NewEngine(opts)
f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
ctx, cancelCtx := context.WithCancel(context.Background())
ctx = NewOriginContext(ctx, map[string]string{"foo": "bar"})
defer cancelCtx()
testErr := errors.New("failure")
query := engine.newTestQuery(func(ctx context.Context) error {
return testErr
})
res := query.Exec(ctx)
testutil.NotOk(t, res.Err, "query should have failed")
for i, field := range []interface{}{"query", "test statement", "error", testErr} {
testutil.Assert(t, f1.logs[i] == field, "expected %v as key, got %v", field, f1.logs[i])
}
}

View file

@ -80,7 +80,7 @@ func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666)
if err != nil {
level.Error(logger).Log("msg", "Error opening query log file", "file", filename, "err", err)
return nil, err

View file

@ -18,6 +18,7 @@ import (
"fmt"
"math"
"math/rand"
"net"
"net/http"
"net/url"
"os"
@ -341,6 +342,11 @@ func (api *API) query(r *http.Request) apiFuncResult {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
ctx, err = contextFromRequest(ctx, r)
if err != nil {
return apiFuncResult{nil, returnAPIError(err), nil, nil}
}
res := qry.Exec(ctx)
if res.Err != nil {
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
@ -411,6 +417,11 @@ func (api *API) queryRange(r *http.Request) apiFuncResult {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
}
ctx, err = contextFromRequest(ctx, r)
if err != nil {
return apiFuncResult{nil, returnAPIError(err), nil, nil}
}
res := qry.Exec(ctx)
if res.Err != nil {
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
@ -1469,3 +1480,11 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) {
func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool {
return false
}
func contextFromRequest(ctx context.Context, r *http.Request) (context.Context, error) {
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return ctx, err
}
return promql.NewOriginContext(ctx, map[string]string{"clientIP": ip}), nil
}

View file

@ -1466,9 +1466,12 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI
if m == http.MethodPost {
r, err := http.NewRequest(m, "http://example.com", strings.NewReader(q.Encode()))
r.Header.Set("Content-Type", "application/x-www-form-urlencoded")
r.RemoteAddr = "127.0.0.1:20201"
return r, err
}
return http.NewRequest(m, fmt.Sprintf("http://example.com?%s", q.Encode()), nil)
r, err := http.NewRequest(m, fmt.Sprintf("http://example.com?%s", q.Encode()), nil)
r.RemoteAddr = "127.0.0.1:20201"
return r, err
}
for i, test := range tests {