prometheus/web/web.go
Fabian Reinartz 7ccd4b39b8 *: implement query params
This adds a parameter to the storage selection interface which allows
query engine(s) to pass information about the operations surrounding a
data selection.
This can for example be used by remote storage backends to infer the
correct downsampling aggregates that need to be provided.
2018-02-13 12:17:22 +01:00

817 lines
22 KiB
Go

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package web
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
stdlog "log"
"net"
"net/http"
"net/http/pprof"
"net/url"
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc"
pprof_runtime "runtime/pprof"
template_text "text/template"
"github.com/cockroachdb/cmux"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mwitkow/go-conntrack"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/tsdb"
"golang.org/x/net/netutil"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/template"
"github.com/prometheus/prometheus/util/httputil"
api_v1 "github.com/prometheus/prometheus/web/api/v1"
api_v2 "github.com/prometheus/prometheus/web/api/v2"
"github.com/prometheus/prometheus/web/ui"
)
var localhostRepresentations = []string{"127.0.0.1", "localhost"}
// Handler serves various HTTP endpoints of the Prometheus server
type Handler struct {
logger log.Logger
scrapeManager *scrape.Manager
ruleManager *rules.Manager
queryEngine *promql.Engine
context context.Context
tsdb func() *tsdb.DB
storage storage.Storage
notifier *notifier.Manager
apiV1 *api_v1.API
router *route.Router
quitCh chan struct{}
reloadCh chan chan error
options *Options
config *config.Config
configString string
versionInfo *PrometheusVersion
birth time.Time
cwd string
flagsMap map[string]string
externalLabels model.LabelSet
mtx sync.RWMutex
now func() model.Time
ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions.
}
// ApplyConfig updates the config field of the Handler struct
func (h *Handler) ApplyConfig(conf *config.Config) error {
h.mtx.Lock()
defer h.mtx.Unlock()
h.config = conf
return nil
}
// PrometheusVersion contains build information about Prometheus.
type PrometheusVersion struct {
Version string `json:"version"`
Revision string `json:"revision"`
Branch string `json:"branch"`
BuildUser string `json:"buildUser"`
BuildDate string `json:"buildDate"`
GoVersion string `json:"goVersion"`
}
// Options for the web Handler.
type Options struct {
Context context.Context
TSDB func() *tsdb.DB
Storage storage.Storage
QueryEngine *promql.Engine
ScrapeManager *scrape.Manager
RuleManager *rules.Manager
Notifier *notifier.Manager
Version *PrometheusVersion
Flags map[string]string
ListenAddress string
ReadTimeout time.Duration
MaxConnections int
ExternalURL *url.URL
RoutePrefix string
MetricsPath string
UseLocalAssets bool
UserAssetsPath string
ConsoleTemplatesPath string
ConsoleLibrariesPath string
EnableLifecycle bool
EnableAdminAPI bool
}
// New initializes a new web Handler.
func New(logger log.Logger, o *Options) *Handler {
router := route.New()
cwd, err := os.Getwd()
if err != nil {
cwd = "<error retrieving current working directory>"
}
if logger == nil {
logger = log.NewNopLogger()
}
h := &Handler{
logger: logger,
router: router,
quitCh: make(chan struct{}),
reloadCh: make(chan chan error),
options: o,
versionInfo: o.Version,
birth: time.Now(),
cwd: cwd,
flagsMap: o.Flags,
context: o.Context,
scrapeManager: o.ScrapeManager,
ruleManager: o.RuleManager,
queryEngine: o.QueryEngine,
tsdb: o.TSDB,
storage: o.Storage,
notifier: o.Notifier,
now: model.Now,
ready: 0,
}
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.scrapeManager, h.notifier,
func() config.Config {
h.mtx.RLock()
defer h.mtx.RUnlock()
return *h.config
},
h.testReady,
h.options.TSDB,
h.options.EnableAdminAPI,
)
if o.RoutePrefix != "/" {
// If the prefix is missing for the root path, prepend it.
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, o.RoutePrefix, http.StatusFound)
})
router = router.WithPrefix(o.RoutePrefix)
}
instrh := prometheus.InstrumentHandler
instrf := prometheus.InstrumentHandlerFunc
readyf := h.testReady
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, path.Join(o.ExternalURL.Path, "/graph"), http.StatusFound)
})
router.Get("/alerts", readyf(instrf("alerts", h.alerts)))
router.Get("/graph", readyf(instrf("graph", h.graph)))
router.Get("/status", readyf(instrf("status", h.status)))
router.Get("/flags", readyf(instrf("flags", h.flags)))
router.Get("/config", readyf(instrf("config", h.serveConfig)))
router.Get("/rules", readyf(instrf("rules", h.rules)))
router.Get("/targets", readyf(instrf("targets", h.targets)))
router.Get("/version", readyf(instrf("version", h.version)))
router.Get("/service-discovery", readyf(instrf("servicediscovery", h.serviceDiscovery)))
router.Get("/heap", instrf("heap", h.dumpHeap))
router.Get("/metrics", prometheus.Handler().ServeHTTP)
router.Get("/federate", readyf(instrh("federate", httputil.CompressionHandler{
Handler: http.HandlerFunc(h.federation),
})))
router.Get("/consoles/*filepath", readyf(instrf("consoles", h.consoles)))
router.Get("/static/*filepath", instrf("static", h.serveStaticAsset))
if o.UserAssetsPath != "" {
router.Get("/user/*filepath", instrf("user", route.FileServe(o.UserAssetsPath)))
}
if o.EnableLifecycle {
router.Post("/-/quit", h.quit)
router.Post("/-/reload", h.reload)
} else {
router.Post("/-/quit", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Lifecycle APIs are not enabled"))
})
router.Post("/-/reload", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Lifecycle APIs are not enabled"))
})
}
router.Get("/-/quit", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte("Only POST requests allowed"))
})
router.Get("/-/reload", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte("Only POST requests allowed"))
})
router.Get("/debug/*subpath", serveDebug)
router.Post("/debug/*subpath", serveDebug)
router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Prometheus is Healthy.\n")
})
router.Get("/-/ready", readyf(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Prometheus is Ready.\n")
}))
return h
}
var corsHeaders = map[string]string{
"Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin",
"Access-Control-Allow-Methods": "GET, OPTIONS",
"Access-Control-Allow-Origin": "*",
"Access-Control-Expose-Headers": "Date",
}
// Enables cross-site script calls.
func setCORS(w http.ResponseWriter) {
for h, v := range corsHeaders {
w.Header().Set(h, v)
}
}
func serveDebug(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
subpath := route.Param(ctx, "subpath")
if subpath == "/pprof" {
http.Redirect(w, req, req.URL.Path+"/", http.StatusMovedPermanently)
return
}
if !strings.HasPrefix(subpath, "/pprof/") {
http.NotFound(w, req)
return
}
subpath = strings.TrimPrefix(subpath, "/pprof/")
switch subpath {
case "cmdline":
pprof.Cmdline(w, req)
case "profile":
pprof.Profile(w, req)
case "symbol":
pprof.Symbol(w, req)
case "trace":
pprof.Trace(w, req)
default:
req.URL.Path = "/debug/pprof/" + subpath
pprof.Index(w, req)
}
}
func (h *Handler) serveStaticAsset(w http.ResponseWriter, req *http.Request) {
fp := route.Param(req.Context(), "filepath")
fp = filepath.Join("web/ui/static", fp)
info, err := ui.AssetInfo(fp)
if err != nil {
level.Warn(h.logger).Log("msg", "Could not get file info", "err", err, "file", fp)
w.WriteHeader(http.StatusNotFound)
return
}
file, err := ui.Asset(fp)
if err != nil {
if err != io.EOF {
level.Warn(h.logger).Log("msg", "Could not get file", "err", err, "file", fp)
}
w.WriteHeader(http.StatusNotFound)
return
}
http.ServeContent(w, req, info.Name(), info.ModTime(), bytes.NewReader(file))
}
// Ready sets Handler to be ready.
func (h *Handler) Ready() {
atomic.StoreUint32(&h.ready, 1)
}
// Verifies whether the server is ready or not.
func (h *Handler) isReady() bool {
ready := atomic.LoadUint32(&h.ready)
if ready == 0 {
return false
}
return true
}
// Checks if server is ready, calls f if it is, returns 503 if it is not.
func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if h.isReady() {
f(w, r)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "Service Unavailable")
}
}
}
// Checks if server is ready, calls f if it is, returns 503 if it is not.
func (h *Handler) testReadyHandler(f http.Handler) http.HandlerFunc {
return h.testReady(f.ServeHTTP)
}
// Quit returns the receive-only quit channel.
func (h *Handler) Quit() <-chan struct{} {
return h.quitCh
}
// Reload returns the receive-only channel that signals configuration reload requests.
func (h *Handler) Reload() <-chan chan error {
return h.reloadCh
}
// Run serves the HTTP endpoints.
func (h *Handler) Run(ctx context.Context) error {
level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress)
listener, err := net.Listen("tcp", h.options.ListenAddress)
if err != nil {
return err
}
listener = netutil.LimitListener(listener, h.options.MaxConnections)
// Monitor incoming connections with conntrack.
listener = conntrack.NewListener(listener,
conntrack.TrackWithName("http"),
conntrack.TrackWithTracing())
var (
m = cmux.New(listener)
grpcl = m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpl = m.Match(cmux.HTTP1Fast())
grpcSrv = grpc.NewServer()
)
av2 := api_v2.New(
time.Now,
h.options.TSDB,
h.options.QueryEngine,
h.options.Storage.Querier,
func() []*scrape.Target {
return h.options.ScrapeManager.Targets()
},
func() []*url.URL {
return h.options.Notifier.Alertmanagers()
},
h.options.EnableAdminAPI,
)
av2.RegisterGRPC(grpcSrv)
hh, err := av2.HTTPHandler(h.options.ListenAddress)
if err != nil {
return err
}
hhFunc := h.testReadyHandler(hh)
operationName := nethttp.OperationNameFunc(func(r *http.Request) string {
return fmt.Sprintf("%s %s", r.Method, r.URL.Path)
})
mux := http.NewServeMux()
mux.Handle("/", h.router)
av1 := route.New()
h.apiV1.Register(av1)
apiPath := "/api"
if h.options.RoutePrefix != "/" {
apiPath = h.options.RoutePrefix + apiPath
level.Info(h.logger).Log("msg", "router prefix", "prefix", h.options.RoutePrefix)
}
mux.Handle(apiPath+"/v1/", http.StripPrefix(apiPath+"/v1", av1))
mux.Handle(apiPath+"/", http.StripPrefix(apiPath,
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
setCORS(w)
hhFunc(w, r)
}),
))
errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0)
httpSrv := &http.Server{
Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName),
ErrorLog: errlog,
ReadTimeout: h.options.ReadTimeout,
}
go func() {
if err := httpSrv.Serve(httpl); err != nil {
level.Warn(h.logger).Log("msg", "error serving HTTP", "err", err)
}
}()
go func() {
if err := grpcSrv.Serve(grpcl); err != nil {
level.Warn(h.logger).Log("msg", "error serving gRPC", "err", err)
}
}()
errCh := make(chan error)
go func() {
errCh <- m.Serve()
}()
select {
case e := <-errCh:
return e
case <-ctx.Done():
httpSrv.Shutdown(ctx)
grpcSrv.GracefulStop()
return nil
}
}
func (h *Handler) alerts(w http.ResponseWriter, r *http.Request) {
alerts := h.ruleManager.AlertingRules()
alertsSorter := byAlertStateAndNameSorter{alerts: alerts}
sort.Sort(alertsSorter)
alertStatus := AlertStatus{
AlertingRules: alertsSorter.alerts,
AlertStateToRowClass: map[rules.AlertState]string{
rules.StateInactive: "success",
rules.StatePending: "warning",
rules.StateFiring: "danger",
},
}
h.executeTemplate(w, "alerts.html", alertStatus)
}
func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
name := route.Param(ctx, "filepath")
file, err := http.Dir(h.options.ConsoleTemplatesPath).Open(name)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
text, err := ioutil.ReadAll(file)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Provide URL parameters as a map for easy use. Advanced users may have need for
// parameters beyond the first, so provide RawParams.
rawParams, err := url.ParseQuery(r.URL.RawQuery)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
params := map[string]string{}
for k, v := range rawParams {
params[k] = v[0]
}
data := struct {
RawParams url.Values
Params map[string]string
Path string
}{
RawParams: rawParams,
Params: params,
Path: strings.TrimLeft(name, "/"),
}
tmpl := template.NewTemplateExpander(
h.context,
string(text),
"__console_"+name,
data,
h.now(),
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine, h.storage)),
h.options.ExternalURL,
)
filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
result, err := tmpl.ExpandHTML(filenames)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
io.WriteString(w, result)
}
func (h *Handler) graph(w http.ResponseWriter, r *http.Request) {
h.executeTemplate(w, "graph.html", nil)
}
func (h *Handler) status(w http.ResponseWriter, r *http.Request) {
h.executeTemplate(w, "status.html", struct {
Birth time.Time
CWD string
Version *PrometheusVersion
Alertmanagers []*url.URL
}{
Birth: h.birth,
CWD: h.cwd,
Version: h.versionInfo,
Alertmanagers: h.notifier.Alertmanagers(),
})
}
func (h *Handler) flags(w http.ResponseWriter, r *http.Request) {
h.executeTemplate(w, "flags.html", h.flagsMap)
}
func (h *Handler) serveConfig(w http.ResponseWriter, r *http.Request) {
h.mtx.RLock()
defer h.mtx.RUnlock()
h.executeTemplate(w, "config.html", h.config.String())
}
func (h *Handler) rules(w http.ResponseWriter, r *http.Request) {
h.executeTemplate(w, "rules.html", h.ruleManager)
}
func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
var index []string
targets := h.scrapeManager.TargetMap()
for job := range targets {
index = append(index, job)
}
sort.Strings(index)
scrapeConfigData := struct {
Index []string
Targets map[string][]*scrape.Target
}{
Index: index,
Targets: targets,
}
h.executeTemplate(w, "service-discovery.html", scrapeConfigData)
}
func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
// Bucket targets by job label
tps := map[string][]*scrape.Target{}
for _, t := range h.scrapeManager.Targets() {
job := t.Labels().Get(model.JobLabel)
tps[job] = append(tps[job], t)
}
for _, targets := range tps {
sort.Slice(targets, func(i, j int) bool {
return targets[i].Labels().Get(labels.InstanceName) < targets[j].Labels().Get(labels.InstanceName)
})
}
h.executeTemplate(w, "targets.html", struct {
TargetPools map[string][]*scrape.Target
}{
TargetPools: tps,
})
}
func (h *Handler) version(w http.ResponseWriter, r *http.Request) {
dec := json.NewEncoder(w)
if err := dec.Encode(h.versionInfo); err != nil {
http.Error(w, fmt.Sprintf("error encoding JSON: %s", err), http.StatusInternalServerError)
}
}
func (h *Handler) quit(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Requesting termination... Goodbye!")
close(h.quitCh)
}
func (h *Handler) reload(w http.ResponseWriter, r *http.Request) {
rc := make(chan error)
h.reloadCh <- rc
if err := <-rc; err != nil {
http.Error(w, fmt.Sprintf("failed to reload config: %s", err), http.StatusInternalServerError)
}
}
func (h *Handler) consolesPath() string {
if _, err := os.Stat(h.options.ConsoleTemplatesPath + "/index.html"); !os.IsNotExist(err) {
return h.options.ExternalURL.Path + "/consoles/index.html"
}
if h.options.UserAssetsPath != "" {
if _, err := os.Stat(h.options.UserAssetsPath + "/index.html"); !os.IsNotExist(err) {
return h.options.ExternalURL.Path + "/user/index.html"
}
}
return ""
}
func tmplFuncs(consolesPath string, opts *Options) template_text.FuncMap {
return template_text.FuncMap{
"since": func(t time.Time) time.Duration {
return time.Since(t) / time.Millisecond * time.Millisecond
},
"consolesPath": func() string { return consolesPath },
"pathPrefix": func() string { return opts.ExternalURL.Path },
"buildVersion": func() string { return opts.Version.Revision },
"stripLabels": func(lset map[string]string, labels ...string) map[string]string {
for _, ln := range labels {
delete(lset, ln)
}
return lset
},
"globalURL": func(u *url.URL) *url.URL {
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
return u
}
for _, lhr := range localhostRepresentations {
if host == lhr {
_, ownPort, err := net.SplitHostPort(opts.ListenAddress)
if err != nil {
return u
}
if port == ownPort {
// Only in the case where the target is on localhost and its port is
// the same as the one we're listening on, we know for sure that
// we're monitoring our own process and that we need to change the
// scheme, hostname, and port to the externally reachable ones as
// well. We shouldn't need to touch the path at all, since if a
// path prefix is defined, the path under which we scrape ourselves
// should already contain the prefix.
u.Scheme = opts.ExternalURL.Scheme
u.Host = opts.ExternalURL.Host
} else {
// Otherwise, we only know that localhost is not reachable
// externally, so we replace only the hostname by the one in the
// external URL. It could be the wrong hostname for the service on
// this port, but it's still the best possible guess.
host, _, err := net.SplitHostPort(opts.ExternalURL.Host)
if err != nil {
return u
}
u.Host = host + ":" + port
}
break
}
}
return u
},
"numHealthy": func(pool []*scrape.Target) int {
alive := len(pool)
for _, p := range pool {
if p.Health() != scrape.HealthGood {
alive--
}
}
return alive
},
"healthToClass": func(th scrape.TargetHealth) string {
switch th {
case scrape.HealthUnknown:
return "warning"
case scrape.HealthGood:
return "success"
default:
return "danger"
}
},
"alertStateToClass": func(as rules.AlertState) string {
switch as {
case rules.StateInactive:
return "success"
case rules.StatePending:
return "warning"
case rules.StateFiring:
return "danger"
default:
panic("unknown alert state")
}
},
}
}
func (h *Handler) getTemplate(name string) (string, error) {
baseTmpl, err := ui.Asset("web/ui/templates/_base.html")
if err != nil {
return "", fmt.Errorf("error reading base template: %s", err)
}
pageTmpl, err := ui.Asset(filepath.Join("web/ui/templates", name))
if err != nil {
return "", fmt.Errorf("error reading page template %s: %s", name, err)
}
return string(baseTmpl) + string(pageTmpl), nil
}
func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data interface{}) {
text, err := h.getTemplate(name)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
tmpl := template.NewTemplateExpander(
h.context,
text,
name,
data,
h.now(),
template.QueryFunc(rules.EngineQueryFunc(h.queryEngine, h.storage)),
h.options.ExternalURL,
)
tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options))
result, err := tmpl.ExpandHTML(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
io.WriteString(w, result)
}
func (h *Handler) dumpHeap(w http.ResponseWriter, r *http.Request) {
target := fmt.Sprintf("/tmp/%d.heap", time.Now().Unix())
f, err := os.Create(target)
if err != nil {
level.Error(h.logger).Log("msg", "Could not dump heap", "err", err)
}
fmt.Fprintf(w, "Writing to %s...", target)
defer f.Close()
pprof_runtime.WriteHeapProfile(f)
fmt.Fprintf(w, "Done")
}
// AlertStatus bundles alerting rules and the mapping of alert states to row classes.
type AlertStatus struct {
AlertingRules []*rules.AlertingRule
AlertStateToRowClass map[rules.AlertState]string
}
type byAlertStateAndNameSorter struct {
alerts []*rules.AlertingRule
}
func (s byAlertStateAndNameSorter) Len() int {
return len(s.alerts)
}
func (s byAlertStateAndNameSorter) Less(i, j int) bool {
return s.alerts[i].State() > s.alerts[j].State() ||
(s.alerts[i].State() == s.alerts[j].State() &&
s.alerts[i].Name() < s.alerts[j].Name())
}
func (s byAlertStateAndNameSorter) Swap(i, j int) {
s.alerts[i], s.alerts[j] = s.alerts[j], s.alerts[i]
}