*: migrate to go-kit/log

This commit is contained in:
Fabian Reinartz 2017-08-11 20:45:52 +02:00 committed by Goutham Veeramachaneni
parent a8887f46dc
commit d21f149745
55 changed files with 495 additions and 361 deletions

View file

@ -27,10 +27,13 @@ import (
"syscall"
"time"
k8s_runtime "k8s.io/apimachinery/pkg/util/runtime"
"github.com/asaskevich/govalidator"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"golang.org/x/net/context"
@ -38,6 +41,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/promlog"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
@ -86,8 +90,7 @@ func main() {
prometheusURL string
logFormat string
logLevel string
logLevel promlog.AllowedLevel
}{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
@ -100,13 +103,8 @@ func main() {
a.HelpFlag.Short('h')
a.Flag("log.level",
"Only log messages with the given severity or above. One of: [debug, info, warn, error, fatal]").
Default("info").StringVar(&cfg.logLevel)
a.Flag("log.format",
`Set the log target and format. Example: "logger:syslog?appname=bob&local=7" or "logger:stdout?json=true"`).
Default("logger:stderr").StringVar(&cfg.logFormat)
a.Flag(promlog.LevelFlagName, promlog.LevelFlagHelp).
Default("info").SetValue(&cfg.logLevel)
a.Flag("config.file", "Prometheus configuration file path.").
Default("prometheus.yml").StringVar(&cfg.configFile)
@ -203,13 +201,20 @@ func main() {
cfg.queryEngine.Timeout = time.Duration(cfg.queryTimeout)
logger := log.NewLogger(os.Stdout)
logger.SetLevel(cfg.logLevel)
logger.SetFormat(cfg.logFormat)
logger := promlog.New(cfg.logLevel)
logger.Infoln("Starting prometheus", version.Info())
logger.Infoln("Build context", version.BuildContext())
logger.Infoln("Host details", Uname())
// XXX(fabxc): The Kubernetes does background logging which we can only customize by modifying
// a global variable.
// Ultimately, here is the best place to set it.
k8s_runtime.ErrorHandlers = []func(error){
func(err error) {
level.Error(log.With(logger, "component", "k8s_client_runtime")).Log("err", err)
},
}
level.Info(logger).Log("msg", "Starting prometheus", "version", version.Info())
level.Info(logger).Log("build_context", version.BuildContext())
level.Info(logger).Log("host_details", Uname())
var (
// sampleAppender = storage.Fanout{}
@ -221,22 +226,30 @@ func main() {
hup := make(chan os.Signal)
hupReady := make(chan bool)
signal.Notify(hup, syscall.SIGHUP)
logger.Infoln("Starting tsdb")
localStorage, err := tsdb.Open(cfg.localStoragePath, prometheus.DefaultRegisterer, &cfg.tsdb)
level.Info(logger).Log("msg", "Starting TSDB")
localStorage, err := tsdb.Open(
cfg.localStoragePath,
log.With(logger, "component", "tsdb"),
prometheus.DefaultRegisterer,
&cfg.tsdb,
)
if err != nil {
log.Errorf("Opening storage failed: %s", err)
level.Error(logger).Log("msg", "Opening TSDB failed", "err", err)
os.Exit(1)
}
logger.Infoln("tsdb started")
remoteStorage := &remote.Storage{}
level.Info(logger).Log("msg", "TSDB succesfully started")
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"))
reloadables = append(reloadables, remoteStorage)
fanoutStorage := storage.NewFanout(tsdb.Adapter(localStorage), remoteStorage)
fanoutStorage := storage.NewFanout(logger, tsdb.Adapter(localStorage), remoteStorage)
cfg.queryEngine.Logger = logger
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
var (
notifier = notifier.New(&cfg.notifier, logger)
targetManager = retrieval.NewTargetManager(fanoutStorage, logger)
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
targetManager = retrieval.NewTargetManager(fanoutStorage, log.With(logger, "component", "target manager"))
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
)
@ -247,7 +260,7 @@ func main() {
QueryEngine: queryEngine,
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
Logger: logger,
Logger: log.With(logger, "component", "rule manager"),
})
cfg.web.Context = ctx
@ -271,12 +284,12 @@ func main() {
cfg.web.Flags[f.Name] = f.Value.String()
}
webHandler := web.New(&cfg.web)
webHandler := web.New(log.With(logger, "componennt", "web"), &cfg.web)
reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
logger.Errorf("Error loading config: %s", err)
level.Error(logger).Log("msg", "Error loading config", "err", err)
os.Exit(1)
}
@ -289,11 +302,11 @@ func main() {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
logger.Errorf("Error reloading config: %s", err)
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}
case rc := <-webHandler.Reload():
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
logger.Errorf("Error reloading config: %s", err)
level.Error(logger).Log("msg", "Error reloading config", "err", err)
rc <- err
} else {
rc <- nil
@ -305,7 +318,7 @@ func main() {
// Start all components. The order is NOT arbitrary.
defer func() {
if err := fanoutStorage.Close(); err != nil {
log.Errorln("Error stopping storage:", err)
level.Error(logger).Log("msg", "Closing storage(s) failed", "err", err)
}
}()
@ -337,20 +350,20 @@ func main() {
// Set web server to ready.
webHandler.Ready()
log.Info("Server is ready to receive requests.")
level.Info(logger).Log("msg", "Server is ready to receive requests.")
term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
select {
case <-term:
logger.Warn("Received SIGTERM, exiting gracefully...")
level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
case <-webHandler.Quit():
logger.Warn("Received termination request via web service, exiting gracefully...")
level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...")
case err := <-errc:
logger.Errorln("Error starting web server, exiting gracefully:", err)
level.Error(logger).Log("msg", "Error starting web server, exiting gracefully", "err", err)
}
logger.Info("See you next time!")
level.Info(logger).Log("msg", "See you next time!")
}
// Reloadable things can change their internal state to match a new config
@ -360,7 +373,8 @@ type Reloadable interface {
}
func reloadConfig(filename string, logger log.Logger, rls ...Reloadable) (err error) {
logger.Infof("Loading configuration file %s", filename)
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
defer func() {
if err == nil {
configSuccess.Set(1)
@ -378,7 +392,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...Reloadable) (err er
failed := false
for _, rl := range rls {
if err := rl.ApplyConfig(conf); err != nil {
logger.Error("Failed to apply configuration: ", err)
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
}

View file

@ -23,8 +23,9 @@ import (
"github.com/Azure/azure-sdk-for-go/arm/network"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -71,6 +72,9 @@ type Discovery struct {
// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
func NewDiscovery(cfg *config.AzureSDConfig, logger log.Logger) *Discovery {
if logger == nil {
logger = log.NewNopLogger()
}
return &Discovery{
cfg: cfg,
interval: time.Duration(cfg.RefreshInterval),
@ -93,7 +97,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
tg, err := d.refresh()
if err != nil {
d.logger.Errorf("unable to refresh during Azure discovery: %s", err)
level.Error(d.logger).Log("msg", "Unable to refresh during Azure discovery", "err", err)
} else {
select {
case <-ctx.Done():
@ -149,7 +153,7 @@ func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error)
s := strings.Split(id, "/")
if len(s) != 9 {
err := fmt.Errorf("invalid ID '%s'. Refusing to create azureResource", id)
logger.Error(err)
level.Error(logger).Log("err", err)
return azureResource{}, err
}
return azureResource{
@ -159,6 +163,8 @@ func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error)
}
func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
defer level.Debug(d.logger).Log("msg", "Azure discovery completed")
t0 := time.Now()
defer func() {
azureSDRefreshDuration.Observe(time.Since(t0).Seconds())
@ -187,7 +193,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
}
machines = append(machines, *result.Value...)
}
d.logger.Debugf("Found %d virtual machines during Azure discovery.", len(machines))
level.Debug(d.logger).Log("msg", "Found virtual machines during Azure discovery.", "count", len(machines))
// We have the slice of machines. Now turn them into targets.
// Doing them in go routines because the network interface calls are slow.
@ -228,7 +234,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
}
networkInterface, err := client.nic.Get(r.ResourceGroup, r.Name, "")
if err != nil {
d.logger.Errorf("Unable to get network interface %s: %s", r.Name, err)
level.Error(d.logger).Log("msg", "Unable to get network interface", "name", r.Name, "err", err)
ch <- target{labelSet: nil, err: err}
// Get out of this routine because we cannot continue without a network interface.
return
@ -239,7 +245,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
// yet support this. On deallocated machines, this value happens to be nil so it
// is a cheap and easy way to determine if a machine is allocated or not.
if networkInterface.Properties.Primary == nil {
d.logger.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name)
level.Debug(d.logger).Log("msg", "Skipping deallocated virtual machine", "machine", *vm.Name)
ch <- target{}
return
}
@ -274,6 +280,5 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
}
}
d.logger.Debugf("Azure discovery completed.")
return tg, nil
}

View file

@ -21,9 +21,10 @@ import (
"strings"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
consul "github.com/hashicorp/consul/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
@ -97,6 +98,10 @@ type Discovery struct {
// NewDiscovery returns a new Discovery for the given config.
func NewDiscovery(conf *config.ConsulSDConfig, logger log.Logger) (*Discovery, error) {
if logger == nil {
logger = log.NewNopLogger()
}
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
@ -168,7 +173,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
}
if err != nil {
d.logger.Errorf("Error refreshing service list: %s", err)
level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err)
rpcFailuresCount.Inc()
time.Sleep(retryInterval)
continue
@ -184,7 +189,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
if d.clientDatacenter == "" {
info, err := d.client.Agent().Self()
if err != nil {
d.logger.Errorf("Error retrieving datacenter name: %s", err)
level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err)
time.Sleep(retryInterval)
continue
}
@ -265,7 +270,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetG
}
if err != nil {
srv.logger.Errorf("Error refreshing service %s: %s", srv.name, err)
level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "err", err)
rpcFailuresCount.Inc()
time.Sleep(retryInterval)
continue

View file

@ -16,14 +16,13 @@ package consul
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config"
)
func TestConfiguredService(t *testing.T) {
conf := &config.ConsulSDConfig{
Services: []string{"configuredServiceName"}}
consulDiscovery, err := NewDiscovery(conf, log.Base())
consulDiscovery, err := NewDiscovery(conf, nil)
if err != nil {
t.Errorf("Unexpected error when initialising discovery %v", err)
@ -38,7 +37,7 @@ func TestConfiguredService(t *testing.T) {
func TestNonConfiguredService(t *testing.T) {
conf := &config.ConsulSDConfig{}
consulDiscovery, err := NewDiscovery(conf, log.Base())
consulDiscovery, err := NewDiscovery(conf, nil)
if err != nil {
t.Errorf("Unexpected error when initialising discovery %v", err)

View file

@ -18,7 +18,8 @@ import (
"sync"
"time"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/azure"
"github.com/prometheus/prometheus/discovery/consul"
@ -59,68 +60,68 @@ func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) m
}
for i, c := range cfg.DNSSDConfigs {
app("dns", i, dns.NewDiscovery(c, logger))
app("dns", i, dns.NewDiscovery(c, log.With(logger, "discovery", "dns")))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, file.NewDiscovery(c, logger))
app("file", i, file.NewDiscovery(c, log.With(logger, "discovery", "file")))
}
for i, c := range cfg.ConsulSDConfigs {
k, err := consul.NewDiscovery(c, logger)
k, err := consul.NewDiscovery(c, log.With(logger, "discovery", "consul"))
if err != nil {
logger.Errorf("Cannot create Consul discovery: %s", err)
level.Error(logger).Log("msg", "Cannot create Consul discovery", "err", err)
continue
}
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
m, err := marathon.NewDiscovery(c, logger)
m, err := marathon.NewDiscovery(c, log.With(logger, "discovery", "marathon"))
if err != nil {
logger.Errorf("Cannot create Marathon discovery: %s", err)
level.Error(logger).Log("msg", "Cannot create Marathon discovery", "err", err)
continue
}
app("marathon", i, m)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := kubernetes.New(logger, c)
k, err := kubernetes.New(log.With(logger, "discovery", "k8s"), c)
if err != nil {
logger.Errorf("Cannot create Kubernetes discovery: %s", err)
level.Error(logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, zookeeper.NewServersetDiscovery(c, logger))
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(logger, "discovery", "zookeeper")))
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, zookeeper.NewNerveDiscovery(c, logger))
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(logger, "discovery", "nerve")))
}
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, ec2.NewDiscovery(c, logger))
app("ec2", i, ec2.NewDiscovery(c, log.With(logger, "discovery", "ec2")))
}
for i, c := range cfg.OpenstackSDConfigs {
openstackd, err := openstack.NewDiscovery(c, logger)
openstackd, err := openstack.NewDiscovery(c, log.With(logger, "discovery", "openstack"))
if err != nil {
log.Errorf("Cannot initialize OpenStack discovery: %s", err)
level.Error(logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
continue
}
app("openstack", i, openstackd)
}
for i, c := range cfg.GCESDConfigs {
gced, err := gce.NewDiscovery(c, logger)
gced, err := gce.NewDiscovery(c, log.With(logger, "discovery", "gce"))
if err != nil {
logger.Errorf("Cannot initialize GCE discovery: %s", err)
level.Error(logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
continue
}
app("gce", i, gced)
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, azure.NewDiscovery(c, logger))
app("azure", i, azure.NewDiscovery(c, log.With(logger, "discovery", "azure")))
}
for i, c := range cfg.TritonSDConfigs {
t, err := triton.New(logger.With("sd", "triton"), c)
t, err := triton.New(log.With(logger, "discovery", "trition"), c)
if err != nil {
logger.Errorf("Cannot create Triton discovery: %s", err)
level.Error(logger).Log("msg", "Cannot create Triton discovery", "err", err)
continue
}
app("triton", i, t)

View file

@ -16,7 +16,6 @@ package discovery
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config"
"golang.org/x/net/context"
yaml "gopkg.in/yaml.v2"
@ -54,7 +53,7 @@ static_configs:
go ts.Run(ctx)
ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base()))
ts.UpdateProviders(ProvidersFromConfig(*cfg, nil))
<-called
verifyPresence(ts.tgroups, "static/0/0", true)
@ -68,7 +67,7 @@ static_configs:
t.Fatalf("Unable to load YAML config sTwo: %s", err)
}
ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base()))
ts.UpdateProviders(ProvidersFromConfig(*cfg, nil))
<-called
verifyPresence(ts.tgroups, "static/0/0", true)

View file

@ -20,9 +20,10 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/miekg/dns"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -71,6 +72,10 @@ type Discovery struct {
// NewDiscovery returns a new Discovery which periodically refreshes its targets.
func NewDiscovery(conf *config.DNSSDConfig, logger log.Logger) *Discovery {
if logger == nil {
logger = log.NewNopLogger()
}
qtype := dns.TypeSRV
switch strings.ToUpper(conf.Type) {
case "A":
@ -114,7 +119,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr
for _, name := range d.names {
go func(n string) {
if err := d.refresh(ctx, n, ch); err != nil {
d.logger.Errorf("Error refreshing DNS targets: %s", err)
level.Error(d.logger).Log("msg", "Error refreshing DNS targets", "err", err)
}
wg.Done()
}(name)
@ -149,7 +154,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi
case *dns.AAAA:
target = hostPort(addr.AAAA.String(), d.port)
default:
d.logger.Warnf("%q is not a valid SRV record", record)
level.Warn(d.logger).Log("msg", "Invalid SRV record", "record", record)
continue
}
@ -183,11 +188,7 @@ func lookupAll(name string, qtype uint16, logger log.Logger) (*dns.Msg, error) {
for _, lname := range conf.NameList(name) {
response, err = lookup(lname, qtype, client, servAddr, false)
if err != nil {
logger.
With("server", server).
With("name", name).
With("reason", err).
Warn("DNS resolution failed.")
level.Warn(logger).Log("msg", "DNS resolution failed", "server", server, "name", name, "err", err)
continue
}
if len(response.Answer) > 0 {

View file

@ -22,8 +22,9 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -81,6 +82,9 @@ func NewDiscovery(conf *config.EC2SDConfig, logger log.Logger) *Discovery {
if conf.AccessKey == "" && conf.SecretKey == "" {
creds = nil
}
if logger == nil {
logger = log.NewNopLogger()
}
return &Discovery{
aws: &aws.Config{
Region: &conf.Region,
@ -101,7 +105,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Get an initial set right away.
tg, err := d.refresh()
if err != nil {
d.logger.Error(err)
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
} else {
select {
case ch <- []*config.TargetGroup{tg}:
@ -115,7 +119,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case <-ticker.C:
tg, err := d.refresh()
if err != nil {
d.logger.Error(err)
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
continue
}

View file

@ -22,8 +22,9 @@ import (
"strings"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"gopkg.in/fsnotify.v1"
@ -69,6 +70,9 @@ type Discovery struct {
// NewDiscovery returns a new file discovery for the given paths.
func NewDiscovery(conf *config.FileSDConfig, logger log.Logger) *Discovery {
if logger == nil {
logger = log.NewNopLogger()
}
return &Discovery{
paths: conf.Files,
interval: time.Duration(conf.RefreshInterval),
@ -82,7 +86,7 @@ func (d *Discovery) listFiles() []string {
for _, p := range d.paths {
files, err := filepath.Glob(p)
if err != nil {
d.logger.Errorf("Error expanding glob %q: %s", p, err)
level.Error(d.logger).Log("msg", "Error expanding glob", "glob", p, "err", err)
continue
}
paths = append(paths, files...)
@ -103,7 +107,7 @@ func (d *Discovery) watchFiles() {
p = "./"
}
if err := d.watcher.Add(p); err != nil {
d.logger.Errorf("Error adding file watch for %q: %s", p, err)
level.Error(d.logger).Log("msg", "Error adding file watch", "path", p, "err", err)
}
}
}
@ -114,7 +118,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
d.logger.Errorf("Error creating file watcher: %s", err)
level.Error(d.logger).Log("msg", "Error adding file watcher", "err", err)
return
}
d.watcher = watcher
@ -152,7 +156,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case err := <-d.watcher.Errors:
if err != nil {
d.logger.Errorf("Error on file watch: %s", err)
level.Error(d.logger).Log("msg", "Error watching file", "err", err)
}
}
}
@ -160,7 +164,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// stop shuts down the file watcher.
func (d *Discovery) stop() {
d.logger.Debugf("Stopping file discovery for %s...", d.paths)
level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", d.paths)
done := make(chan struct{})
defer close(done)
@ -178,10 +182,10 @@ func (d *Discovery) stop() {
}
}()
if err := d.watcher.Close(); err != nil {
d.logger.Errorf("Error closing file watcher for %s: %s", d.paths, err)
level.Error(d.logger).Log("msg", "Error closing file watcher", "paths", d.paths, "err", err)
}
d.logger.Debugf("File discovery for %s stopped.", d.paths)
level.Debug(d.logger).Log("File discovery stopped", "paths", d.paths)
}
// refresh reads all files matching the discovery's patterns and sends the respective
@ -197,7 +201,8 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup
tgroups, err := readFile(p)
if err != nil {
fileSDReadErrorsCount.Inc()
d.logger.Errorf("Error reading file %q: %s", p, err)
level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err)
// Prevent deletion down below.
ref[p] = d.lastRefresh[p]
continue

View file

@ -21,7 +21,6 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -47,7 +46,7 @@ func testFileSD(t *testing.T, prefix, ext string, expect bool) {
conf.RefreshInterval = model.Duration(1 * time.Hour)
var (
fsd = NewDiscovery(&conf, log.Base())
fsd = NewDiscovery(&conf, nil)
ch = make(chan []*config.TargetGroup)
ctx, cancel = context.WithCancel(context.Background())
)

View file

@ -21,8 +21,9 @@ import (
"google.golang.org/api/compute/v1"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/oauth2"
@ -81,6 +82,9 @@ type Discovery struct {
// NewDiscovery returns a new Discovery which periodically refreshes its targets.
func NewDiscovery(conf *config.GCESDConfig, logger log.Logger) (*Discovery, error) {
if logger == nil {
logger = log.NewNopLogger()
}
gd := &Discovery{
project: conf.Project,
zone: conf.Zone,
@ -108,7 +112,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Get an initial set right away.
tg, err := d.refresh()
if err != nil {
d.logger.Error(err)
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
} else {
select {
case ch <- []*config.TargetGroup{tg}:
@ -124,7 +128,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case <-ticker.C:
tg, err := d.refresh()
if err != nil {
d.logger.Error(err)
level.Error(d.logger).Log("msg", "Refresh failed", "err", err)
continue
}
select {

View file

@ -20,7 +20,8 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
apiv1 "k8s.io/client-go/pkg/api/v1"
@ -42,6 +43,9 @@ type Endpoints struct {
// NewEndpoints returns a new endpoints discovery.
func NewEndpoints(l log.Logger, svc, eps, pod cache.SharedInformer) *Endpoints {
if l == nil {
l = log.NewNopLogger()
}
ep := &Endpoints{
logger: l,
endpointsInf: eps,
@ -74,7 +78,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
if tg == nil {
return
}
e.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("endpoints update")
level.Debug(e.logger).Log("msg", "endpoints update", "tg", fmt.Sprintf("%#v", tg))
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
@ -87,7 +91,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
eps, err := convertToEndpoints(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Endpoints object failed")
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(e.buildEndpoints(eps))
@ -97,7 +101,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
eps, err := convertToEndpoints(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Endpoints object failed")
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(e.buildEndpoints(eps))
@ -107,7 +111,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
eps, err := convertToEndpoints(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Endpoints object failed")
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(&config.TargetGroup{Source: endpointsSource(eps)})
@ -117,7 +121,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
serviceUpdate := func(o interface{}) {
svc, err := convertToService(o)
if err != nil {
e.logger.With("err", err).Errorln("converting to Service object failed")
level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
@ -129,7 +133,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
send(e.buildEndpoints(obj.(*apiv1.Endpoints)))
}
if err != nil {
e.logger.With("err", err).Errorln("retrieving endpoints failed")
level.Error(e.logger).Log("msg", "retrieving endpoints failed", "err", err)
}
}
e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -309,7 +313,7 @@ func (e *Endpoints) resolvePodRef(ref *apiv1.ObjectReference) *apiv1.Pod {
return nil
}
if err != nil {
e.logger.With("err", err).Errorln("resolving pod ref failed")
level.Error(e.logger).Log("msg", "resolving pod ref failed", "err", err)
}
return obj.(*apiv1.Pod)
}
@ -324,7 +328,7 @@ func (e *Endpoints) addServiceLabels(ns, name string, tg *config.TargetGroup) {
return
}
if err != nil {
e.logger.With("err", err).Errorln("retrieving service failed")
level.Error(e.logger).Log("msg", "retrieving service failed", "err", err)
}
svc = obj.(*apiv1.Service)

View file

@ -16,7 +16,6 @@ package kubernetes
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -36,7 +35,7 @@ func makeTestEndpointsDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fa
svc := newFakeServiceInformer()
eps := newFakeEndpointsInformer()
pod := newFakePodInformer()
return NewEndpoints(log.Base(), svc, eps, pod), svc, eps, pod
return NewEndpoints(nil, svc, eps, pod), svc, eps, pod
}
func makeEndpoints() *v1.Endpoints {

View file

@ -21,10 +21,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
apiv1 "k8s.io/client-go/pkg/api/v1"
@ -69,14 +69,6 @@ type Discovery struct {
namespaceDiscovery *config.KubernetesNamespaceDiscovery
}
func init() {
runtime.ErrorHandlers = []func(error){
func(err error) {
log.With("component", "kube_client_runtime").Errorln(err)
},
}
}
func (d *Discovery) getNamespaces() []string {
namespaces := d.namespaceDiscovery.Names
if len(namespaces) == 0 {
@ -87,6 +79,9 @@ func (d *Discovery) getNamespaces() []string {
// New creates a new Kubernetes discovery for the given role.
func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
if l == nil {
l = log.NewNopLogger()
}
var (
kcfg *rest.Config
err error
@ -101,18 +96,19 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
// Because the handling of configuration parameters changes
// we should inform the user when their currently configured values
// will be ignored due to precedence of InClusterConfig
l.Info("Using pod service account via in-cluster config")
level.Info(l).Log("msg", "Using pod service account via in-cluster config")
if conf.TLSConfig.CAFile != "" {
l.Warn("Configured TLS CA file is ignored when using pod service account")
level.Warn(l).Log("msg", "Configured TLS CA file is ignored when using pod service account")
}
if conf.TLSConfig.CertFile != "" || conf.TLSConfig.KeyFile != "" {
l.Warn("Configured TLS client certificate is ignored when using pod service account")
level.Warn(l).Log("msg", "Configured TLS client certificate is ignored when using pod service account")
}
if conf.BearerToken != "" {
l.Warn("Configured auth token is ignored when using pod service account")
level.Warn(l).Log("msg", "Configured auth token is ignored when using pod service account")
}
if conf.BasicAuth != nil {
l.Warn("Configured basic authentication credentials are ignored when using pod service account")
level.Warn(l).Log("msg", "Configured basic authentication credentials are ignored when using pod service account")
}
} else {
kcfg = &rest.Config{
@ -171,7 +167,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
eps := NewEndpoints(
d.logger.With("kubernetes_sd", "endpoint"),
log.With(d.logger, "k8s_sd", "endpoint"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
@ -201,7 +197,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
for _, namespace := range namespaces {
plw := cache.NewListWatchFromClient(rclient, "pods", namespace, nil)
pod := NewPod(
d.logger.With("kubernetes_sd", "pod"),
log.With(d.logger, "k8s_sd", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
)
go pod.informer.Run(ctx.Done())
@ -221,7 +217,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
for _, namespace := range namespaces {
slw := cache.NewListWatchFromClient(rclient, "services", namespace, nil)
svc := NewService(
d.logger.With("kubernetes_sd", "service"),
log.With(d.logger, "k8s_sd", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
)
go svc.informer.Run(ctx.Done())
@ -239,7 +235,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case "node":
nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil)
node := NewNode(
d.logger.With("kubernetes_sd", "node"),
log.With(d.logger, "k8s_sd", "node"),
cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod),
)
go node.informer.Run(ctx.Done())
@ -250,7 +246,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
node.Run(ctx, ch)
default:
d.logger.Errorf("unknown Kubernetes discovery kind %q", d.role)
level.Error(d.logger).Log("msg", "unknown Kubernetes discovery kind", "role", d.role)
}
<-ctx.Done()

View file

@ -18,7 +18,8 @@ import (
"net"
"strconv"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
@ -37,6 +38,9 @@ type Node struct {
// NewNode returns a new node discovery.
func NewNode(l log.Logger, inf cache.SharedInformer) *Node {
if l == nil {
l = log.NewNopLogger()
}
return &Node{logger: l, informer: inf, store: inf.GetStore()}
}
@ -67,7 +71,7 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
node, err := convertToNode(o)
if err != nil {
n.logger.With("err", err).Errorln("converting to Node object failed")
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return
}
send(n.buildNode(node))
@ -77,7 +81,7 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
node, err := convertToNode(o)
if err != nil {
n.logger.With("err", err).Errorln("converting to Node object failed")
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return
}
send(&config.TargetGroup{Source: nodeSource(node)})
@ -87,7 +91,7 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
node, err := convertToNode(o)
if err != nil {
n.logger.With("err", err).Errorln("converting to Node object failed")
level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err)
return
}
send(n.buildNode(node))
@ -150,7 +154,7 @@ func (n *Node) buildNode(node *apiv1.Node) *config.TargetGroup {
addr, addrMap, err := nodeAddress(node)
if err != nil {
n.logger.With("err", err).Debugf("No node address found")
level.Warn(n.logger).Log("msg", "No node address found", "err", err)
return nil
}
addr = net.JoinHostPort(addr, strconv.FormatInt(int64(node.Status.DaemonEndpoints.KubeletEndpoint.Port), 10))

View file

@ -20,7 +20,6 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/require"
@ -159,7 +158,7 @@ func newFakeNodeInformer() *fakeInformer {
func makeTestNodeDiscovery() (*Node, *fakeInformer) {
i := newFakeNodeInformer()
return NewNode(log.Base(), i), i
return NewNode(nil, i), i
}
func makeNode(name, address string, labels map[string]string, annotations map[string]string) *v1.Node {

View file

@ -19,7 +19,8 @@ import (
"strconv"
"strings"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
@ -38,6 +39,9 @@ type Pod struct {
// NewPod creates a new pod discovery.
func NewPod(l log.Logger, pods cache.SharedInformer) *Pod {
if l == nil {
l = log.NewNopLogger()
}
return &Pod{
informer: pods,
store: pods.GetStore(),
@ -53,7 +57,7 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
tg := p.buildPod(o.(*apiv1.Pod))
initial = append(initial, tg)
p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("initial pod")
level.Debug(p.logger).Log("msg", "initial pod", "tg", fmt.Sprintf("%#v", tg))
}
select {
case <-ctx.Done():
@ -63,7 +67,7 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send target groups for pod updates.
send := func(tg *config.TargetGroup) {
p.logger.With("tg", fmt.Sprintf("%#v", tg)).Debugln("pod update")
level.Debug(p.logger).Log("msg", "pod update", "tg", fmt.Sprintf("%#v", tg))
select {
case <-ctx.Done():
case ch <- []*config.TargetGroup{tg}:
@ -75,7 +79,7 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
pod, err := convertToPod(o)
if err != nil {
p.logger.With("err", err).Errorln("converting to Pod object failed")
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return
}
send(p.buildPod(pod))
@ -85,7 +89,7 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
pod, err := convertToPod(o)
if err != nil {
p.logger.With("err", err).Errorln("converting to Pod object failed")
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return
}
send(&config.TargetGroup{Source: podSource(pod)})
@ -95,7 +99,7 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
pod, err := convertToPod(o)
if err != nil {
p.logger.With("err", err).Errorln("converting to Pod object failed")
level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)
return
}
send(p.buildPod(pod))

View file

@ -16,7 +16,6 @@ package kubernetes
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -34,7 +33,7 @@ func newFakePodInformer() *fakeInformer {
func makeTestPodDiscovery() (*Pod, *fakeInformer) {
i := newFakePodInformer()
return NewPod(log.Base(), i), i
return NewPod(nil, i), i
}
func makeMultiPortPod() *v1.Pod {

View file

@ -18,7 +18,8 @@ import (
"net"
"strconv"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
@ -36,6 +37,9 @@ type Service struct {
// NewService returns a new service discovery.
func NewService(l log.Logger, inf cache.SharedInformer) *Service {
if l == nil {
l = log.NewNopLogger()
}
return &Service{logger: l, informer: inf, store: inf.GetStore()}
}
@ -66,7 +70,7 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
svc, err := convertToService(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Service object failed")
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
send(s.buildService(svc))
@ -76,7 +80,7 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
svc, err := convertToService(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Service object failed")
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
send(&config.TargetGroup{Source: serviceSource(svc)})
@ -86,7 +90,7 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
svc, err := convertToService(o)
if err != nil {
s.logger.With("err", err).Errorln("converting to Service object failed")
level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err)
return
}
send(s.buildService(svc))

View file

@ -17,7 +17,6 @@ import (
"fmt"
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -35,7 +34,7 @@ func newFakeServiceInformer() *fakeInformer {
func makeTestServiceDiscovery() (*Service, *fakeInformer) {
i := newFakeServiceInformer()
return NewService(log.Base(), i), i
return NewService(nil, i), i
}
func makeMultiPortService() *v1.Service {

View file

@ -26,8 +26,9 @@ import (
"golang.org/x/net/context"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
@ -94,6 +95,10 @@ type Discovery struct {
// NewDiscovery returns a new Marathon Discovery.
func NewDiscovery(conf *config.MarathonSDConfig, logger log.Logger) (*Discovery, error) {
if logger == nil {
logger = log.NewNopLogger()
}
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
@ -134,7 +139,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case <-time.After(d.refreshInterval):
err := d.updateServices(ctx, ch)
if err != nil {
d.logger.Errorf("Error while updating services: %s", err)
level.Error(d.logger).Log("msg", "Error while updating services", "err", err)
}
}
}
@ -173,7 +178,7 @@ func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Targ
case <-ctx.Done():
return ctx.Err()
case ch <- []*config.TargetGroup{{Source: source}}:
d.logger.Debugf("Removing group for %s", source)
level.Debug(d.logger).Log("msg", "Removing group", "source", source)
}
}
}

View file

@ -19,7 +19,6 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -33,7 +32,7 @@ var (
)
func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error {
md, err := NewDiscovery(&conf, log.Base())
md, err := NewDiscovery(&conf, nil)
if err != nil {
return err
}
@ -141,7 +140,7 @@ func TestMarathonSDSendGroup(t *testing.T) {
func TestMarathonSDRemoveApp(t *testing.T) {
var ch = make(chan []*config.TargetGroup, 1)
md, err := NewDiscovery(&conf, log.Base())
md, err := NewDiscovery(&conf, nil)
if err != nil {
t.Fatalf("%s", err)
}
@ -177,7 +176,7 @@ func TestMarathonSDRunAndStop(t *testing.T) {
ch = make(chan []*config.TargetGroup)
doneCh = make(chan error)
)
md, err := NewDiscovery(&conf, log.Base())
md, err := NewDiscovery(&conf, nil)
if err != nil {
t.Fatalf("%s", err)
}

View file

@ -18,11 +18,12 @@ import (
"net"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gophercloud/gophercloud"
"github.com/gophercloud/gophercloud/openstack"
"github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/hypervisors"
"github.com/gophercloud/gophercloud/pagination"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -58,7 +59,7 @@ func (h *HypervisorDiscovery) Run(ctx context.Context, ch chan<- []*config.Targe
// Get an initial set right away.
tg, err := h.refresh()
if err != nil {
h.logger.Error(err)
level.Error(h.logger).Log("msg", "Unable refresh target groups", "err", err.Error())
} else {
select {
case ch <- []*config.TargetGroup{tg}:
@ -75,7 +76,7 @@ func (h *HypervisorDiscovery) Run(ctx context.Context, ch chan<- []*config.Targe
case <-ticker.C:
tg, err := h.refresh()
if err != nil {
h.logger.Error(err)
level.Error(h.logger).Log("msg", "Unable refresh target groups", "err", err.Error())
continue
}

View file

@ -18,12 +18,13 @@ import (
"net"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gophercloud/gophercloud"
"github.com/gophercloud/gophercloud/openstack"
"github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/floatingips"
"github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
"github.com/gophercloud/gophercloud/pagination"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
@ -63,7 +64,7 @@ func (i *InstanceDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetG
// Get an initial set right away.
tg, err := i.refresh()
if err != nil {
i.logger.Error(err)
level.Error(i.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
} else {
select {
case ch <- []*config.TargetGroup{tg}:
@ -80,7 +81,7 @@ func (i *InstanceDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetG
case <-ticker.C:
tg, err := i.refresh()
if err != nil {
i.logger.Error(err)
level.Error(i.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
continue
}
@ -155,27 +156,27 @@ func (i *InstanceDiscovery) refresh() (*config.TargetGroup, error) {
openstackLabelInstanceID: model.LabelValue(s.ID),
}
if len(s.Addresses) == 0 {
i.logger.Info("Got no IP address for instance %s", s.ID)
level.Info(i.logger).Log("msg", "Got no IP address", "instance", s.ID)
continue
}
for _, address := range s.Addresses {
md, ok := address.([]interface{})
if !ok {
i.logger.Warn("Invalid type for address, expected array")
level.Warn(i.logger).Log("msg", "Invalid type for address, expected array")
continue
}
if len(md) == 0 {
i.logger.Debugf("Got no IP address for instance %s", s.ID)
level.Debug(i.logger).Log("msg", "Got no IP address", "instance", s.ID)
continue
}
md1, ok := md[0].(map[string]interface{})
if !ok {
i.logger.Warn("Invalid type for address, expected dict")
level.Warn(i.logger).Log("msg", "Invalid type for address, expected dict")
continue
}
addr, ok := md1["addr"].(string)
if !ok {
i.logger.Warn("Invalid type for address, expected string")
level.Warn(i.logger).Log("msg", "Invalid type for address, expected string")
continue
}
labels[openstackLabelPrivateIP] = model.LabelValue(addr)
@ -191,7 +192,7 @@ func (i *InstanceDiscovery) refresh() (*config.TargetGroup, error) {
labels[openstackLabelInstanceName] = model.LabelValue(s.Name)
id, ok := s.Flavor["id"].(string)
if !ok {
i.logger.Warn("Invalid type for instance id, excepted string")
level.Warn(i.logger).Log("msg", "Invalid type for instance id, excepted string")
continue
}
labels[openstackLabelInstanceFlavor] = model.LabelValue(id)

View file

@ -20,7 +20,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
@ -58,7 +57,7 @@ func (s *OpenstackSDInstanceTestSuite) openstackAuthSuccess() (Discovery, error)
Region: "RegionOne",
Role: "instance",
}
return NewDiscovery(&conf, log.Base())
return NewDiscovery(&conf, nil)
}
func (s *OpenstackSDInstanceTestSuite) TestOpenstackSDInstanceRefresh() {

View file

@ -17,9 +17,9 @@ import (
"errors"
"time"
"github.com/go-kit/kit/log"
"github.com/gophercloud/gophercloud"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"

View file

@ -20,8 +20,9 @@ import (
"net/http"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
@ -77,6 +78,10 @@ type Discovery struct {
// New returns a new Discovery which periodically refreshes its targets.
func New(logger log.Logger, conf *config.TritonSDConfig) (*Discovery, error) {
if logger == nil {
logger = log.NewNopLogger()
}
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
@ -103,7 +108,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Get an initial set right away.
tg, err := d.refresh()
if err != nil {
d.logger.With("err", err).Error("Refreshing targets failed")
level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err)
} else {
ch <- []*config.TargetGroup{tg}
}
@ -113,7 +118,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case <-ticker.C:
tg, err := d.refresh()
if err != nil {
d.logger.With("err", err).Error("Refreshing targets failed")
level.Error(d.logger).Log("msg", "Refreshing targets failed", "err", err)
} else {
ch <- []*config.TargetGroup{tg}
}

View file

@ -23,7 +23,6 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/assert"
@ -54,17 +53,14 @@ var (
CertFile: "shouldnotexist.cert",
},
}
logger = log.Base()
)
func TestTritonSDNew(t *testing.T) {
td, err := New(logger, &conf)
td, err := New(nil, &conf)
assert.Nil(t, err)
assert.NotNil(t, td)
assert.NotNil(t, td.client)
assert.NotNil(t, td.interval)
assert.NotNil(t, td.logger)
assert.Equal(t, logger, td.logger, "td.logger equals logger")
assert.NotNil(t, td.sdConfig)
assert.Equal(t, conf.Account, td.sdConfig.Account)
assert.Equal(t, conf.DNSSuffix, td.sdConfig.DNSSuffix)
@ -73,14 +69,14 @@ func TestTritonSDNew(t *testing.T) {
}
func TestTritonSDNewBadConfig(t *testing.T) {
td, err := New(logger, &badconf)
td, err := New(nil, &badconf)
assert.NotNil(t, err)
assert.Nil(t, td)
}
func TestTritonSDRun(t *testing.T) {
var (
td, err = New(logger, &conf)
td, err = New(nil, &conf)
ch = make(chan []*config.TargetGroup)
ctx, cancel = context.WithCancel(context.Background())
)
@ -132,7 +128,7 @@ func TestTritonSDRefreshMultipleTargets(t *testing.T) {
func TestTritonSDRefreshNoServer(t *testing.T) {
var (
td, err = New(logger, &conf)
td, err = New(nil, &conf)
)
assert.Nil(t, err)
assert.NotNil(t, td)
@ -146,7 +142,7 @@ func TestTritonSDRefreshNoServer(t *testing.T) {
func testTritonSDRefresh(t *testing.T, dstr string) []model.LabelSet {
var (
td, err = New(logger, &conf)
td, err = New(nil, &conf)
s = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, dstr)
}))

View file

@ -24,7 +24,7 @@ import (
"github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/prometheus/util/treecache"
@ -63,6 +63,10 @@ func NewDiscovery(
logger log.Logger,
pf func(data []byte, path string) (model.LabelSet, error),
) *Discovery {
if logger == nil {
logger = log.NewNopLogger()
}
conn, _, err := zk.Connect(srvs, timeout)
conn.SetLogger(treecache.ZookeeperLogger{})
if err != nil {

View file

@ -21,12 +21,15 @@ import (
"sort"
"time"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
)
// Client allows sending batches of Prometheus samples to Graphite.
type Client struct {
logger log.Logger
address string
transport string
timeout time.Duration
@ -34,8 +37,12 @@ type Client struct {
}
// NewClient creates a new Client.
func NewClient(address string, transport string, timeout time.Duration, prefix string) *Client {
func NewClient(logger log.Logger, address string, transport string, timeout time.Duration, prefix string) *Client {
if logger == nil {
logger = log.NewNopLogger()
}
return &Client{
logger: logger,
address: address,
transport: transport,
timeout: timeout,
@ -86,8 +93,7 @@ func (c *Client) Write(samples model.Samples) error {
t := float64(s.Timestamp.UnixNano()) / 1e9
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
log.Warnf("cannot send value %f to Graphite,"+
"skipping sample %#v", v, s)
level.Warn(c.logger).Log("msg", "cannot send value to Graphite, skipping sample", "value", v, "sample", s)
continue
}
fmt.Fprintf(&buf, "%s %f %f\n", k, v, t)

View file

@ -17,10 +17,12 @@ import (
"encoding/json"
"fmt"
"math"
"os"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
@ -29,6 +31,8 @@ import (
// Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct {
logger log.Logger
client influx.Client
database string
retentionPolicy string
@ -36,14 +40,16 @@ type Client struct {
}
// NewClient creates a new Client.
func NewClient(conf influx.HTTPConfig, db string, rp string) *Client {
func NewClient(logger log.Logger, conf influx.HTTPConfig, db string, rp string) *Client {
c, err := influx.NewHTTPClient(conf)
// Currently influx.NewClient() *should* never return an error.
if err != nil {
log.Fatal(err)
level.Error(logger).Log("err", err)
os.Exit(1)
}
return &Client{
logger: logger,
client: c,
database: db,
retentionPolicy: rp,
@ -73,7 +79,7 @@ func (c *Client) Write(samples model.Samples) error {
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
log.Debugf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
level.Debug(c.logger).Log("msg", "cannot send to InfluxDB, skipping sample", "value", v, "sample", s)
c.ignoredSamples.Inc()
continue
}

View file

@ -25,10 +25,11 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
influx "github.com/influxdata/influxdb/client/v2"
@ -36,6 +37,7 @@ import (
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb"
"github.com/prometheus/prometheus/pkg/promlog"
"github.com/prometheus/prometheus/prompb"
)
@ -96,8 +98,13 @@ func main() {
cfg := parseFlags()
http.Handle(cfg.telemetryPath, prometheus.Handler())
writers, readers := buildClients(cfg)
serve(cfg.listenAddr, writers, readers)
logLevel := promlog.AllowedLevel{}
logLevel.Set("debug")
logger := promlog.New(logLevel)
writers, readers := buildClients(logger, cfg)
serve(logger, cfg.listenAddr, writers, readers)
}
func parseFlags() *config {
@ -150,23 +157,29 @@ type reader interface {
Name() string
}
func buildClients(cfg *config) ([]writer, []reader) {
func buildClients(logger log.Logger, cfg *config) ([]writer, []reader) {
var writers []writer
var readers []reader
if cfg.graphiteAddress != "" {
c := graphite.NewClient(
log.With(logger, "storage", "Graphite"),
cfg.graphiteAddress, cfg.graphiteTransport,
cfg.remoteTimeout, cfg.graphitePrefix)
writers = append(writers, c)
}
if cfg.opentsdbURL != "" {
c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout)
c := opentsdb.NewClient(
log.With(logger, "storage", "OpenTSDB"),
cfg.opentsdbURL,
cfg.remoteTimeout,
)
writers = append(writers, c)
}
if cfg.influxdbURL != "" {
url, err := url.Parse(cfg.influxdbURL)
if err != nil {
log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err)
level.Error(logger).Log("msg", "Failed to parse InfluxDB URL", "url", cfg.influxdbURL, "err", err)
os.Exit(1)
}
conf := influx.HTTPConfig{
Addr: url.String(),
@ -174,7 +187,12 @@ func buildClients(cfg *config) ([]writer, []reader) {
Password: cfg.influxdbPassword,
Timeout: cfg.remoteTimeout,
}
c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy)
c := influxdb.NewClient(
log.With(logger, "storage", "InfluxDB"),
conf,
cfg.influxdbDatabase,
cfg.influxdbRetentionPolicy,
)
prometheus.MustRegister(c)
writers = append(writers, c)
readers = append(readers, c)
@ -183,7 +201,7 @@ func buildClients(cfg *config) ([]writer, []reader) {
return writers, readers
}
func serve(addr string, writers []writer, readers []reader) error {
func serve(logger log.Logger, addr string, writers []writer, readers []reader) error {
http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
@ -213,7 +231,7 @@ func serve(addr string, writers []writer, readers []reader) error {
for _, w := range writers {
wg.Add(1)
go func(rw writer) {
sendSamples(rw, samples)
sendSamples(logger, rw, samples)
wg.Done()
}(w)
}
@ -252,7 +270,7 @@ func serve(addr string, writers []writer, readers []reader) error {
var resp *prompb.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query")
level.Warn(logger).Log("msg", "Error executing query", "query", req, "storage", reader.Name(), "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@ -295,12 +313,12 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
return samples
}
func sendSamples(w writer, samples model.Samples) {
func sendSamples(logger log.Logger, w writer, samples model.Samples) {
begin := time.Now()
err := w.Write(samples)
duration := time.Since(begin).Seconds()
if err != nil {
log.With("num_samples", len(samples)).With("storage", w.Name()).With("err", err).Warnf("Error sending samples to remote storage")
level.Warn(logger).Log("msg", "Error sending samples to remote storage", "err", err, "storage", w.Name(), "num_samples", len(samples))
failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))
}
sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))

View file

@ -23,7 +23,8 @@ import (
"net/url"
"time"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
@ -36,13 +37,16 @@ const (
// Client allows sending batches of Prometheus samples to OpenTSDB.
type Client struct {
logger log.Logger
url string
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(url string, timeout time.Duration) *Client {
func NewClient(logger log.Logger, url string, timeout time.Duration) *Client {
return &Client{
logger: logger,
url: url,
timeout: timeout,
}
@ -75,7 +79,7 @@ func (c *Client) Write(samples model.Samples) error {
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
log.Warnf("cannot send value %f to OpenTSDB, skipping sample %#v", v, s)
level.Warn(c.logger).Log("msg", "cannot send value to OpenTSDB, skipping sample", "value", v, "sample", s)
continue
}
metric := TagValue(s.Metric[model.MetricNameLabel])

View file

@ -26,8 +26,9 @@ import (
"sync/atomic"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
@ -211,6 +212,9 @@ func New(o *Options, logger log.Logger) *Notifier {
if o.Do == nil {
o.Do = ctxhttp.Do
}
if logger == nil {
logger = log.NewNopLogger()
}
n := &Notifier{
queue: make([]*Alert, 0, o.QueueCapacity),
@ -223,7 +227,14 @@ func New(o *Options, logger log.Logger) *Notifier {
queueLenFunc := func() float64 { return float64(n.queueLen()) }
alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) }
n.metrics = newAlertMetrics(o.Registerer, o.QueueCapacity, queueLenFunc, alertmanagersDiscoveredFunc)
n.metrics = newAlertMetrics(
o.Registerer,
o.QueueCapacity,
queueLenFunc,
alertmanagersDiscoveredFunc,
)
return n
}
@ -337,7 +348,7 @@ func (n *Notifier) Send(alerts ...*Alert) {
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
alerts = alerts[d:]
n.logger.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
n.metrics.dropped.Add(float64(d))
}
@ -346,7 +357,7 @@ func (n *Notifier) Send(alerts ...*Alert) {
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]
n.logger.Warnf("Alert notification queue full, dropping %d alerts", d)
level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
@ -404,7 +415,7 @@ func (n *Notifier) sendAll(alerts ...*Alert) bool {
b, err := json.Marshal(alerts)
if err != nil {
n.logger.Errorf("Encoding alerts failed: %s", err)
level.Error(n.logger).Log("msg", "Encoding alerts failed", "err", err)
return false
}
@ -429,7 +440,7 @@ func (n *Notifier) sendAll(alerts ...*Alert) bool {
u := am.url().String()
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
n.logger.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
level.Error(n.logger).Log("alertmanager", u, "count", len(alerts), "msg", "Error sending alert", "err", err)
n.metrics.errors.WithLabelValues(u).Inc()
} else {
atomic.AddUint64(&numSuccess, 1)
@ -468,7 +479,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []
// Stop shuts down the notification handler.
func (n *Notifier) Stop() {
n.logger.Info("Stopping notification handler...")
level.Info(n.logger).Log("msg", "Stopping notification handler...")
n.cancel()
}
@ -526,7 +537,7 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) {
for _, tg := range tgs {
ams, err := alertmanagerFromGroup(tg, s.cfg)
if err != nil {
s.logger.With("err", err).Error("generating discovered Alertmanagers failed")
level.Error(s.logger).Log("msg", "Creating discovered Alertmanagers failed", "err", err)
continue
}
all = append(all, ams...)

View file

@ -26,7 +26,6 @@ import (
"golang.org/x/net/context"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
@ -65,7 +64,7 @@ func TestPostPath(t *testing.T) {
}
func TestHandlerNextBatch(t *testing.T) {
h := New(&Options{}, log.Base())
h := New(&Options{}, nil)
for i := range make([]struct{}, 2*maxBatchSize+1) {
h.queue = append(h.queue, &Alert{
@ -152,7 +151,7 @@ func TestHandlerSendAll(t *testing.T) {
defer server1.Close()
defer server2.Close()
h := New(&Options{}, log.Base())
h := New(&Options{}, nil)
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
@ -215,7 +214,7 @@ func TestCustomDo(t *testing.T) {
Body: ioutil.NopCloser(nil),
}, nil
},
}, log.Base())
}, nil)
h.sendOne(context.Background(), nil, testURL, []byte(testBody))
@ -237,7 +236,7 @@ func TestExternalLabels(t *testing.T) {
Replacement: "c",
},
},
}, log.Base())
}, nil)
// This alert should get the external label attached.
h.Send(&Alert{
@ -277,7 +276,7 @@ func TestHandlerRelabel(t *testing.T) {
Replacement: "renamed",
},
},
}, log.Base())
}, nil)
// This alert should be dropped due to the configuration
h.Send(&Alert{
@ -324,7 +323,7 @@ func TestHandlerQueueing(t *testing.T) {
h := New(&Options{
QueueCapacity: 3 * maxBatchSize,
},
log.Base(),
nil,
)
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
ams: []alertmanager{

View file

@ -23,9 +23,10 @@ import (
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
@ -239,7 +240,7 @@ type EngineOptions struct {
var DefaultEngineOptions = &EngineOptions{
MaxConcurrentQueries: 20,
Timeout: 2 * time.Minute,
Logger: log.Base(),
Logger: log.NewNopLogger(),
}
// NewInstantQuery returns an evaluation query for the given expression at the given time.
@ -517,7 +518,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
// TODO(fabxc): use multi-error.
ng.logger.Errorln("expand series set:", err)
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
return false
}
for _, s := range n.series {
@ -528,7 +529,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
case *MatrixSelector:
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
ng.logger.Errorln("expand series set:", err)
level.Error(ng.logger).Log("msg", "error expanding series set", "err", err)
return false
}
for _, s := range n.series {
@ -580,17 +581,18 @@ func (ev *evaluator) error(err error) {
// recover is the handler that turns panics into returns from the top level of evaluation.
func (ev *evaluator) recover(errp *error) {
e := recover()
if e != nil {
if _, ok := e.(runtime.Error); ok {
// Print the stack trace but do not inhibit the running application.
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
if e == nil {
return
}
if _, ok := e.(runtime.Error); ok {
// Print the stack trace but do not inhibit the running application.
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
ev.logger.Errorf("parser panic: %v\n%s", e, buf)
*errp = fmt.Errorf("unexpected error")
} else {
*errp = e.(error)
}
level.Error(ev.logger).Log("msg", "runtime panic in parser", "err", e, "stacktrace", string(buf))
*errp = fmt.Errorf("unexpected error")
} else {
*errp = e.(error)
}
}

View file

@ -21,7 +21,7 @@ import (
"golang.org/x/net/context"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
)
@ -296,9 +296,8 @@ load 10s
}
func TestRecoverEvaluatorRuntime(t *testing.T) {
ev := &evaluator{
logger: log.Base(),
}
ev := &evaluator{logger: log.NewNopLogger()}
var err error
defer ev.recover(&err)
@ -312,7 +311,7 @@ func TestRecoverEvaluatorRuntime(t *testing.T) {
}
func TestRecoverEvaluatorError(t *testing.T) {
ev := &evaluator{logger: log.Base()}
ev := &evaluator{logger: log.NewNopLogger()}
var err error
e := fmt.Errorf("custom error")

View file

@ -16,13 +16,13 @@ package promql
import (
"fmt"
"math"
"os"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/value"
@ -342,7 +342,7 @@ func (p *parser) recover(errp *error) {
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
log.Errorf("parser panic: %v\n%s", e, buf)
fmt.Fprintf(os.Stderr, "parser panic: %v\n%s", e, buf)
*errp = errUnexpected
} else {
*errp = e.(error)

View file

@ -25,8 +25,9 @@ import (
"time"
"unsafe"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/version"
"golang.org/x/net/context"
@ -142,7 +143,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
}
newLoop := func(
@ -201,7 +202,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
sp.logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
level.Error(sp.logger).Log("msg", "Error creating HTTP client", "err", err)
}
sp.config = cfg
sp.client = client
@ -223,7 +224,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
func() storage.Appender {
return sp.reportAppender(t)
},
sp.logger.With("target", t.labels.String()),
log.With(sp.logger, "target", t.labels),
)
)
wg.Add(1)
@ -253,7 +254,7 @@ func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
sp.logger.With("err", err).Error("creating targets failed")
level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
continue
}
all = append(all, targets...)
@ -293,7 +294,7 @@ func (sp *scrapePool) sync(targets []*Target) {
func() storage.Appender {
return sp.reportAppender(t)
},
sp.logger.With("target", t.labels.String()),
log.With(sp.logger, "target", t.labels),
)
sp.targets[hash] = t
@ -576,7 +577,7 @@ func newScrapeLoop(
l log.Logger,
) *scrapeLoop {
if l == nil {
l = log.Base()
l = log.NewNopLogger()
}
sl := &scrapeLoop{
scraper: sc,
@ -638,7 +639,7 @@ mainLoop:
if scrapeErr == nil {
b = buf.Bytes()
} else {
sl.l.With("err", scrapeErr.Error()).Debug("scrape failed")
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
@ -648,11 +649,11 @@ mainLoop:
// we still call sl.append to trigger stale markers.
total, added, appErr := sl.append(b, start)
if appErr != nil {
sl.l.With("err", appErr).Warn("append failed")
level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, err := sl.append([]byte{}, start); err != nil {
sl.l.With("err", err).Error("append failed")
level.Warn(sl.l).Log("msg", "append failed", "err", err)
}
}
@ -719,10 +720,10 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
if _, _, err := sl.append([]byte{}, staleTime); err != nil {
sl.l.With("err", err).Error("stale append failed")
level.Error(sl.l).Log("msg", "stale append failed", "err", err)
}
if err := sl.reportStale(staleTime); err != nil {
sl.l.With("err", err).Error("stale report failed")
level.Error(sl.l).Log("msg", "stale report failed", "err", err)
}
}
@ -791,17 +792,17 @@ loop:
continue
case storage.ErrOutOfOrderSample:
numOutOfOrder++
sl.l.With("timeseries", string(met)).Debug("Out of order sample")
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
targetScrapeSampleOutOfOrder.Inc()
continue
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
targetScrapeSampleDuplicate.Inc()
continue
case storage.ErrOutOfBounds:
numOutOfBounds++
sl.l.With("timeseries", string(met)).Debug("Out of bounds metric")
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
continue
case errSampleLimit:
@ -840,19 +841,19 @@ loop:
case storage.ErrOutOfOrderSample:
err = nil
numOutOfOrder++
sl.l.With("timeseries", string(met)).Debug("Out of order sample")
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
targetScrapeSampleOutOfOrder.Inc()
continue
case storage.ErrDuplicateSampleForTimestamp:
err = nil
numDuplicates++
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
targetScrapeSampleDuplicate.Inc()
continue
case storage.ErrOutOfBounds:
err = nil
numOutOfBounds++
sl.l.With("timeseries", string(met)).Debug("Out of bounds metric")
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
continue
case errSampleLimit:
@ -878,13 +879,13 @@ loop:
err = sampleLimitErr
}
if numOutOfOrder > 0 {
sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)
}
if numDuplicates > 0 {
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)
}
if numOutOfBounds > 0 {
sl.l.With("numOutOfBounds", numOutOfBounds).Warn("Error on ingesting samples that are too old or are too far into the future")
level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)
}
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {

View file

@ -28,7 +28,7 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(context.Background(), cfg, app, log.Base())
sp = newScrapePool(context.Background(), cfg, app, nil)
)
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
@ -167,7 +167,7 @@ func TestScrapePoolReload(t *testing.T) {
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
logger: log.Base(),
logger: log.NewNopLogger(),
}
// Reloading a scrape pool with a new scrape configuration must stop all scrape
@ -237,7 +237,7 @@ func TestScrapePoolReportAppender(t *testing.T) {
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, log.Base())
sp := newScrapePool(context.Background(), cfg, app, nil)
cfg.HonorLabels = false
wrapped := sp.reportAppender(target)
@ -272,7 +272,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, log.Base())
sp := newScrapePool(context.Background(), cfg, app, nil)
sp.maxAheadTime = 0
cfg.HonorLabels = false

View file

@ -16,7 +16,8 @@ package retrieval
import (
"sync"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
@ -64,7 +65,7 @@ func NewTargetManager(app Appendable, logger log.Logger) *TargetManager {
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
tm.logger.Info("Starting target manager...")
level.Info(tm.logger).Log("msg", "Starting target manager...")
tm.mtx.Lock()
@ -78,7 +79,7 @@ func (tm *TargetManager) Run() {
// Stop all background processing.
func (tm *TargetManager) Stop() {
tm.logger.Infoln("Stopping target manager...")
level.Info(tm.logger).Log("msg", "Stopping target manager...")
tm.mtx.Lock()
// Cancel the base context, this will cause all target providers to shut down
@ -90,7 +91,7 @@ func (tm *TargetManager) Stop() {
// Wait for all scrape inserts to complete.
tm.wg.Wait()
tm.logger.Infoln("Target manager stopped.")
level.Info(tm.logger).Log("msg", "Target manager stopped")
}
func (tm *TargetManager) reload() {
@ -106,7 +107,7 @@ func (tm *TargetManager) reload() {
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.append, tm.logger),
sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)),
}
ts.ts = discovery.NewTargetSet(ts.sp)

View file

@ -25,7 +25,8 @@ import (
html_template "html/template"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -118,7 +119,7 @@ func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, ann
labels: lbls,
annotations: anns,
active: map[uint64]*Alert{},
logger: logger.With("alert", name),
logger: logger,
}
}
@ -203,7 +204,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.En
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
r.logger.Warnf("Error expanding alert template %v with data '%v': %s", r.Name(), tmplData, err)
level.Warn(r.logger).Log("msg", "Expanding alert template failed", "err", err, "data", tmplData)
}
return result
}

View file

@ -16,7 +16,6 @@ package rules
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
)
@ -26,7 +25,7 @@ func TestAlertingRuleHTMLSnippet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "<b>BOLD</b>"), labels.FromStrings("html", "<b>BOLD</b>"), log.Base())
rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "<b>BOLD</b>"), labels.FromStrings("html", "<b>BOLD</b>"), nil)
const want = `alert: <a href="/test/prefix/graph?g0.expr=ALERTS%7Balertname%3D%22testrule%22%7D&g0.tab=0">testrule</a>
expr: <a href="/test/prefix/graph?g0.expr=foo%7Bhtml%3D%22%3Cb%3EBOLD%3Cb%3E%22%7D&g0.tab=0">foo{html=&#34;&lt;b&gt;BOLD&lt;b&gt;&#34;}</a>

View file

@ -25,8 +25,9 @@ import (
html_template "html/template"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
@ -151,7 +152,7 @@ func NewGroup(name, file string, interval time.Duration, rules []Rule, opts *Man
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}),
terminated: make(chan struct{}),
logger: opts.Logger.With("group", name),
logger: log.With(opts.Logger, "group", name),
}
}
@ -308,7 +309,7 @@ func (g *Group) Eval(ts time.Time) {
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if _, ok := err.(promql.ErrQueryCanceled); !ok {
g.logger.Warnf("Error while evaluating rule %q: %s", rule, err)
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
}
evalFailures.WithLabelValues(rtyp).Inc()
return
@ -324,7 +325,7 @@ func (g *Group) Eval(ts time.Time) {
app, err := g.opts.Appendable.Appender()
if err != nil {
g.logger.With("err", err).Warn("creating appender failed")
level.Warn(g.logger).Log("msg", "creating appender failed", "err", err)
return
}
@ -334,22 +335,22 @@ func (g *Group) Eval(ts time.Time) {
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
g.logger.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
g.logger.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
default:
g.logger.With("sample", s).With("err", err).Warn("Rule evaluation result discarded")
level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
seriesReturned[s.Metric.String()] = s.Metric
}
}
if numOutOfOrder > 0 {
g.logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order result from rule evaluation")
level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
}
if numDuplicates > 0 {
g.logger.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp")
level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
}
for metric, lset := range g.seriesInPreviousEval[i] {
@ -362,12 +363,12 @@ func (g *Group) Eval(ts time.Time) {
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
g.logger.With("sample", metric).With("err", err).Warn("adding stale sample failed")
level.Warn(g.logger).Log("msg", "adding stale sample failed", "sample", metric, "err", err)
}
}
}
if err := app.Commit(); err != nil {
g.logger.With("err", err).Warn("rule sample appending failed")
level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err)
} else {
g.seriesInPreviousEval[i] = seriesReturned
}
@ -451,13 +452,13 @@ func (m *Manager) Stop() {
m.mtx.Lock()
defer m.mtx.Unlock()
m.logger.Info("Stopping rule manager...")
level.Info(m.logger).Log("msg", "Stopping rule manager...")
for _, eg := range m.groups {
eg.stop()
}
m.logger.Info("Rule manager stopped.")
level.Info(m.logger).Log("msg", "Rule manager stopped")
}
// ApplyConfig updates the rule manager's state as the config requires. If
@ -481,7 +482,7 @@ func (m *Manager) ApplyConfig(conf *config.Config) error {
groups, errs := m.loadGroups(time.Duration(conf.GlobalConfig.EvaluationInterval), files...)
if errs != nil {
for _, e := range errs {
m.logger.Errorln(e)
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
@ -555,7 +556,7 @@ func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[s
time.Duration(r.For),
labels.FromMap(r.Labels),
labels.FromMap(r.Annotations),
m.logger,
log.With(m.logger, "alert", r.Alert),
))
continue
}

View file

@ -22,7 +22,7 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -58,8 +58,7 @@ func TestAlertingRule(t *testing.T) {
expr,
time.Minute,
labels.FromStrings("severity", "{{\"c\"}}ritical"),
nil,
log.Base(),
nil, nil,
)
baseTime := time.Unix(0, 0)
@ -167,7 +166,7 @@ func TestStaleness(t *testing.T) {
QueryEngine: engine,
Appendable: storage,
Context: context.Background(),
Logger: log.Base(),
Logger: log.NewNopLogger(),
}
expr, err := promql.ParseExpr("a + 1")
@ -244,7 +243,7 @@ func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) {
func TestCopyState(t *testing.T) {
oldGroup := &Group{
rules: []Rule{
NewAlertingRule("alert", nil, 0, nil, nil, log.Base()),
NewAlertingRule("alert", nil, 0, nil, nil, nil),
NewRecordingRule("rule1", nil, nil),
NewRecordingRule("rule2", nil, nil),
NewRecordingRule("rule3", nil, nil),
@ -264,7 +263,7 @@ func TestCopyState(t *testing.T) {
NewRecordingRule("rule3", nil, nil),
NewRecordingRule("rule3", nil, nil),
NewRecordingRule("rule3", nil, nil),
NewAlertingRule("alert", nil, 0, nil, nil, log.Base()),
NewAlertingRule("alert", nil, 0, nil, nil, nil),
NewRecordingRule("rule1", nil, nil),
NewRecordingRule("rule4", nil, nil),
},

View file

@ -17,19 +17,23 @@ import (
"container/heap"
"strings"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/pkg/labels"
)
type fanout struct {
logger log.Logger
primary Storage
secondaries []Storage
}
// NewFanout returns a new fan-out Storage, which proxies reads and writes
// through to multiple underlying storages.
func NewFanout(primary Storage, secondaries ...Storage) Storage {
func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage {
return &fanout{
logger: logger,
primary: primary,
secondaries: secondaries,
}
@ -74,6 +78,7 @@ func (f *fanout) Appender() (Appender, error) {
secondaries = append(secondaries, appender)
}
return &fanoutAppender{
logger: f.logger,
primary: primary,
secondaries: secondaries,
}, nil
@ -97,6 +102,8 @@ func (f *fanout) Close() error {
// fanoutAppender implements Appender.
type fanoutAppender struct {
logger log.Logger
primary Appender
secondaries []Appender
}
@ -136,7 +143,7 @@ func (f *fanoutAppender) Commit() (err error) {
err = appender.Commit()
} else {
if rollbackErr := appender.Rollback(); rollbackErr != nil {
log.Errorf("Squashed rollback error on commit: %v", rollbackErr)
level.Error(f.logger).Log("msg", "Squashed rollback error on commit", "err", rollbackErr)
}
}
}
@ -151,7 +158,7 @@ func (f *fanoutAppender) Rollback() (err error) {
if err == nil {
err = rollbackErr
} else if rollbackErr != nil {
log.Errorf("Squashed rollback error on rollback: %v", rollbackErr)
level.Error(f.logger).Log("msg", "Squashed rollback error on rollback", "err", rollbackErr)
}
}
return nil
@ -370,8 +377,7 @@ func (c *mergeIterator) Seek(t int64) bool {
func (c *mergeIterator) At() (t int64, v float64) {
if len(c.h) == 0 {
log.Error("mergeIterator.At() called after .Next() returned false.")
return 0, 0
panic("mergeIterator.At() called after .Next() returned false.")
}
// TODO do I need to dedupe or just merge?

View file

@ -20,8 +20,9 @@ import (
"golang.org/x/time/rate"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
@ -171,6 +172,8 @@ type StorageClient interface {
// QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient.
type QueueManager struct {
logger log.Logger
cfg QueueManagerConfig
externalLabels model.LabelSet
relabelConfigs []*config.RelabelConfig
@ -190,8 +193,12 @@ type QueueManager struct {
}
// NewQueueManager builds a new QueueManager.
func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
}
t := &QueueManager{
logger: logger,
cfg: cfg,
externalLabels: externalLabels,
relabelConfigs: relabelConfigs,
@ -244,7 +251,7 @@ func (t *QueueManager) Append(s *model.Sample) error {
} else {
droppedSamplesTotal.WithLabelValues(t.queueName).Inc()
if t.logLimiter.Allow() {
log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.")
level.Warn(t.logger).Log("msg", "Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.")
}
}
return nil
@ -272,14 +279,15 @@ func (t *QueueManager) Start() {
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func (t *QueueManager) Stop() {
log.Infof("Stopping remote storage...")
level.Info(t.logger).Log("msg", "Stopping remote storage...")
close(t.quit)
t.wg.Wait()
t.shardsMtx.Lock()
defer t.shardsMtx.Unlock()
t.shards.stop()
log.Info("Remote storage stopped.")
level.Info(t.logger).Log("msg", "Remote storage stopped.")
}
func (t *QueueManager) updateShardsLoop() {
@ -323,15 +331,17 @@ func (t *QueueManager) calculateDesiredShards() {
timePerSample = samplesOutDuration / samplesOut
desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second)
)
log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f",
samplesIn, samplesOut, samplesPending, desiredShards)
level.Debug(t.logger).Log("msg", "QueueManager.caclulateDesiredShards",
"samplesIn", samplesIn, "samplesOut", samplesOut,
"samplesPending", samplesPending, "desiredShards", desiredShards)
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
lowerBound = float64(t.numShards) * (1. - shardToleranceFraction)
upperBound = float64(t.numShards) * (1. + shardToleranceFraction)
)
log.Debugf("QueueManager.updateShardsLoop %f <= %f <= %f", lowerBound, desiredShards, upperBound)
level.Debug(t.logger).Log("msg", "QueueManager.updateShardsLoop",
"lowerBound", lowerBound, "desiredShards", desiredShards, "upperBound", upperBound)
if lowerBound <= desiredShards && desiredShards <= upperBound {
return
}
@ -350,10 +360,10 @@ func (t *QueueManager) calculateDesiredShards() {
// to stay close to shardUpdateDuration.
select {
case t.reshardChan <- numShards:
log.Infof("Remote storage resharding from %d to %d shards.", t.numShards, numShards)
level.Info(t.logger).Log("msg", "Remote storage resharding", "from", t.numShards, "to", numShards)
t.numShards = numShards
default:
log.Infof("Currently resharding, skipping.")
level.Info(t.logger).Log("msg", "Currently resharding, skipping.")
}
}
@ -453,9 +463,9 @@ func (s *shards) runShard(i int) {
case sample, ok := <-queue:
if !ok {
if len(pendingSamples) > 0 {
log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples))
level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", len(pendingSamples))
s.sendSamples(pendingSamples)
log.Debugf("Done flushing.")
level.Debug(s.qm.logger).Log("msg", "Done flushing.")
}
return
}
@ -499,7 +509,7 @@ func (s *shards) sendSamplesWithBackoff(samples model.Samples) {
return
}
log.Warnf("Error sending %d samples to remote storage: %s", len(samples), err)
level.Warn(s.qm.logger).Log("msg", "Error sending samples to remote storage", "count", len(samples), "err", err)
if _, ok := err.(recoverableError); !ok {
break
}

View file

@ -99,7 +99,7 @@ func TestSampleDelivery(t *testing.T) {
cfg := defaultQueueManagerConfig
cfg.MaxShards = 1
m := NewQueueManager(cfg, nil, nil, c)
m := NewQueueManager(nil, cfg, nil, nil, c)
// These should be received by the client.
for _, s := range samples[:len(samples)/2] {
@ -133,7 +133,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient()
c.expectSamples(samples)
m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c)
m := NewQueueManager(nil, defaultQueueManagerConfig, nil, nil, c)
// These should be received by the client.
for _, s := range samples {
@ -211,7 +211,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
cfg := defaultQueueManagerConfig
cfg.MaxShards = 1
cfg.QueueCapacity = n
m := NewQueueManager(cfg, nil, nil, c)
m := NewQueueManager(nil, cfg, nil, nil, c)
m.Start()

View file

@ -16,6 +16,7 @@ package remote
import (
"sync"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
@ -23,7 +24,8 @@ import (
// Storage represents all the remote read and write endpoints. It implements
// storage.Storage.
type Storage struct {
mtx sync.RWMutex
logger log.Logger
mtx sync.RWMutex
// For writes
queues []*QueueManager
@ -33,6 +35,13 @@ type Storage struct {
externalLabels model.LabelSet
}
func NewStorage(l log.Logger) *Storage {
if l == nil {
l = log.NewNopLogger()
}
return &Storage{logger: l}
}
// ApplyConfig updates the state as the new config requires.
func (s *Storage) ApplyConfig(conf *config.Config) error {
s.mtx.Lock()
@ -53,6 +62,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
return err
}
newQueues = append(newQueues, NewQueueManager(
s.logger,
defaultQueueManagerConfig,
conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs,

View file

@ -17,6 +17,7 @@ import (
"time"
"unsafe"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -55,7 +56,7 @@ type Options struct {
}
// Open returns a new storage backed by a TSDB database that is configured for Prometheus.
func Open(path string, r prometheus.Registerer, opts *Options) (*tsdb.DB, error) {
func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*tsdb.DB, error) {
// Start with smallest block duration and create exponential buckets until the exceed the
// configured maximum block duration.
rngs := tsdb.ExponentialBlockRanges(int64(time.Duration(opts.MinBlockDuration).Seconds()*1000), 10, 3)
@ -67,7 +68,7 @@ func Open(path string, r prometheus.Registerer, opts *Options) (*tsdb.DB, error)
}
}
db, err := tsdb.Open(path, nil, r, &tsdb.Options{
db, err := tsdb.Open(path, l, r, &tsdb.Options{
WALFlushInterval: 10 * time.Second,
RetentionDuration: uint64(time.Duration(opts.Retention).Seconds() * 1000),
BlockRanges: rngs,

View file

@ -18,7 +18,6 @@ import (
"os"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/tsdb"
@ -32,11 +31,9 @@ func NewStorage(t T) storage.Storage {
t.Fatalf("Opening test dir failed: %s", err)
}
log.With("dir", dir).Debugln("opening test storage")
// Tests just load data for a series sequentially. Thus we
// need a long appendable window.
db, err := tsdb.Open(dir, nil, &tsdb.Options{
db, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
MinBlockDuration: model.Duration(24 * time.Hour),
MaxBlockDuration: model.Duration(24 * time.Hour),
})
@ -52,8 +49,6 @@ type testStorage struct {
}
func (s testStorage) Close() error {
log.With("dir", s.dir).Debugln("closing test storage")
if err := s.Storage.Close(); err != nil {
return err
}

View file

@ -19,8 +19,9 @@ import (
"strings"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/samuel/go-zookeeper/zk"
)
@ -50,7 +51,7 @@ type ZookeeperLogger struct {
// Implements zk.Logger
func (zl ZookeeperLogger) Printf(s string, i ...interface{}) {
zl.logger.Infof(s, i...)
level.Info(zl.logger).Log("msg", fmt.Sprintf(s, i...))
}
type ZookeeperTreeCache struct {
@ -113,20 +114,20 @@ func (tc *ZookeeperTreeCache) loop(path string) {
err := tc.recursiveNodeUpdate(path, tc.head)
if err != nil {
tc.logger.Errorf("Error during initial read of Zookeeper: %s", err)
level.Error(tc.logger).Log("msg", "Error during initial read of Zookeeper", "err", err)
failure()
}
for {
select {
case ev := <-tc.head.events:
tc.logger.Debugf("Received Zookeeper event: %s", ev)
level.Debug(tc.logger).Log("msg", "Received Zookeeper event", "event", ev)
if failureMode {
continue
}
if ev.Type == zk.EventNotWatching {
tc.logger.Infof("Lost connection to Zookeeper.")
level.Info(tc.logger).Log("msg", "Lost connection to Zookeeper.")
failure()
} else {
path := strings.TrimPrefix(ev.Path, tc.prefix)
@ -147,15 +148,15 @@ func (tc *ZookeeperTreeCache) loop(path string) {
err := tc.recursiveNodeUpdate(ev.Path, node)
if err != nil {
tc.logger.Errorf("Error during processing of Zookeeper event: %s", err)
level.Error(tc.logger).Log("msg", "Error during processing of Zookeeper event", "err", err)
failure()
} else if tc.head.data == nil {
tc.logger.Errorf("Error during processing of Zookeeper event: path %s no longer exists", tc.prefix)
level.Error(tc.logger).Log("msg", "Error during processing of Zookeeper event", "err", "path no longer exists", "path", tc.prefix)
failure()
}
}
case <-retryChan:
tc.logger.Infof("Attempting to resync state with Zookeeper")
level.Info(tc.logger).Log("msg", "Attempting to resync state with Zookeeper")
previousState := &zookeeperTreeCacheNode{
children: tc.head.children,
}
@ -163,13 +164,13 @@ func (tc *ZookeeperTreeCache) loop(path string) {
tc.head.children = make(map[string]*zookeeperTreeCacheNode)
if err := tc.recursiveNodeUpdate(tc.prefix, tc.head); err != nil {
tc.logger.Errorf("Error during Zookeeper resync: %s", err)
level.Error(tc.logger).Log("msg", "Error during Zookeeper resync", "err", err)
// Revert to our previous state.
tc.head.children = previousState.children
failure()
} else {
tc.resyncState(tc.prefix, tc.head, previousState)
tc.logger.Infof("Zookeeper resync successful")
level.Info(tc.logger).Log("Zookeeper resync successful")
failureMode = false
}
case <-tc.stop:

View file

@ -17,11 +17,11 @@ import (
"net/http"
"sort"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -160,7 +160,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
if protMetricFam != nil {
if err := enc.Encode(protMetricFam); err != nil {
federationErrors.Inc()
log.With("err", err).Error("federation failed")
level.Error(h.logger).Log("msg", "federation failed", "err", err)
return
}
}
@ -180,7 +180,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
}
}
if !nameSeen {
log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.")
level.Warn(h.logger).Log("msg", "Ignoring nameless metric during federation", "metric", s.Metric)
continue
}
// Attach global labels if they do not exist yet.
@ -203,7 +203,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
if protMetricFam != nil {
if err := enc.Encode(protMetricFam); err != nil {
federationErrors.Inc()
log.With("err", err).Error("federation failed")
level.Error(h.logger).Log("msg", "federation failed", "err", err)
}
}
}

View file

@ -19,6 +19,7 @@ import (
"fmt"
"io"
"io/ioutil"
stdlog "log"
"net"
"net/http"
"net/http/pprof"
@ -38,10 +39,11 @@ import (
template_text "text/template"
"github.com/cockroachdb/cmux"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
@ -67,6 +69,8 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"}
// Handler serves various HTTP endpoints of the Prometheus server
type Handler struct {
logger log.Logger
targetManager *retrieval.TargetManager
ruleManager *rules.Manager
queryEngine *promql.Engine
@ -141,15 +145,19 @@ type Options struct {
}
// New initializes a new web Handler.
func New(o *Options) *Handler {
func New(logger log.Logger, o *Options) *Handler {
router := route.New()
cwd, err := os.Getwd()
if err != nil {
cwd = "<error retrieving current working directory>"
}
if logger == nil {
logger = log.NewNopLogger()
}
h := &Handler{
logger: logger,
router: router,
quitCh: make(chan struct{}),
reloadCh: make(chan chan error),
@ -205,7 +213,7 @@ func New(o *Options) *Handler {
router.Get("/targets", readyf(instrf("targets", h.targets)))
router.Get("/version", readyf(instrf("version", h.version)))
router.Get("/heap", readyf(instrf("heap", dumpHeap)))
router.Get("/heap", readyf(instrf("heap", h.dumpHeap)))
router.Get("/metrics", prometheus.Handler().ServeHTTP)
@ -215,7 +223,7 @@ func New(o *Options) *Handler {
router.Get("/consoles/*filepath", readyf(instrf("consoles", h.consoles)))
router.Get("/static/*filepath", readyf(instrf("static", serveStaticAsset)))
router.Get("/static/*filepath", readyf(instrf("static", h.serveStaticAsset)))
if o.UserAssetsPath != "" {
router.Get("/user/*filepath", readyf(instrf("user", route.FileServe(o.UserAssetsPath))))
@ -292,20 +300,20 @@ func serveDebug(w http.ResponseWriter, req *http.Request) {
}
}
func serveStaticAsset(w http.ResponseWriter, req *http.Request) {
func (h *Handler) serveStaticAsset(w http.ResponseWriter, req *http.Request) {
fp := route.Param(req.Context(), "filepath")
fp = filepath.Join("web/ui/static", fp)
info, err := ui.AssetInfo(fp)
if err != nil {
log.With("file", fp).Warn("Could not get file info: ", err)
level.Warn(h.logger).Log("msg", "Could not get file info", "err", err, "file", fp)
w.WriteHeader(http.StatusNotFound)
return
}
file, err := ui.Asset(fp)
if err != nil {
if err != io.EOF {
log.With("file", fp).Warn("Could not get file: ", err)
level.Warn(h.logger).Log("msg", "Could not get file", "err", err, "file", fp)
}
w.WriteHeader(http.StatusNotFound)
return
@ -352,7 +360,7 @@ func (h *Handler) Reload() <-chan chan error {
// Run serves the HTTP endpoints.
func (h *Handler) Run(ctx context.Context) error {
log.Infof("Listening on %s", h.options.ListenAddress)
level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress)
l, err := net.Listen("tcp", h.options.ListenAddress)
if err != nil {
@ -409,20 +417,22 @@ func (h *Handler) Run(ctx context.Context) error {
}),
))
errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0)
httpSrv := &http.Server{
Handler: nethttp.Middleware(opentracing.GlobalTracer(), mux, operationName),
ErrorLog: log.NewErrorLogger(),
ErrorLog: errlog,
ReadTimeout: h.options.ReadTimeout,
}
go func() {
if err := httpSrv.Serve(httpl); err != nil {
log.With("err", err).Warnf("error serving HTTP")
level.Warn(h.logger).Log("msg", "error serving HTTP", "err", err)
}
}()
go func() {
if err := grpcSrv.Serve(grpcl); err != nil {
log.With("err", err).Warnf("error serving HTTP")
level.Warn(h.logger).Log("msg", "error serving gRPC", "err", err)
}
}()
@ -697,11 +707,11 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter
io.WriteString(w, result)
}
func dumpHeap(w http.ResponseWriter, r *http.Request) {
func (h *Handler) dumpHeap(w http.ResponseWriter, r *http.Request) {
target := fmt.Sprintf("/tmp/%d.heap", time.Now().Unix())
f, err := os.Create(target)
if err != nil {
log.Error("Could not dump heap: ", err)
level.Error(h.logger).Log("msg", "Could not dump heap", "err", err)
}
fmt.Fprintf(w, "Writing to %s...", target)
defer f.Close()

View file

@ -88,7 +88,7 @@ func TestReadyAndHealthy(t *testing.T) {
opts.Flags = map[string]string{}
webHandler := New(opts)
webHandler := New(nil, opts)
go webHandler.Run(context.Background())
// Give some time for the web goroutine to run since we need the server