Collect at every scrape, rather than at regular intervals.

Switch to Update using the Collecter Collect interface, due to not knowing all
metricnames in all modules beforehand we can't use Describe and thus the full
Collecter interface.

Remove 'updates', it's meaning varies by module and doesn't add much.
This commit is contained in:
Brian Brazil 2014-10-29 14:16:43 +00:00
parent 5c15c86f77
commit 1c17481a42
17 changed files with 198 additions and 291 deletions

View file

@ -29,7 +29,7 @@ func NewAttributesCollector(config Config) (Collector, error) {
for l := range c.config.Attributes { for l := range c.config.Attributes {
labelNames = append(labelNames, l) labelNames = append(labelNames, l)
} }
gv := prometheus.NewGaugeVec( attributes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: Namespace, Namespace: Namespace,
Name: "attributes", Name: "attributes",
@ -37,17 +37,13 @@ func NewAttributesCollector(config Config) (Collector, error) {
}, },
labelNames, labelNames,
) )
collector, err := prometheus.RegisterOrGet(gv)
if err != nil {
return nil, err
}
attributes = collector.(*prometheus.GaugeVec)
return &c, nil return &c, nil
} }
func (c *attributesCollector) Update() (updates int, err error) { func (c *attributesCollector) Update(ch chan<- prometheus.Metric) (err error) {
glog.V(1).Info("Set node_attributes{%v}: 1", c.config.Attributes) glog.V(1).Info("Set node_attributes{%v}: 1", c.config.Attributes)
attributes.Reset() attributes.Reset()
attributes.With(c.config.Attributes).Set(1) attributes.With(c.config.Attributes).Set(1)
return updates, err attributes.Collect(ch)
return err
} }

View file

@ -41,29 +41,22 @@ func init() {
// It exposes the number of configured and active slave of linux bonding interfaces. // It exposes the number of configured and active slave of linux bonding interfaces.
func NewBondingCollector(config Config) (Collector, error) { func NewBondingCollector(config Config) (Collector, error) {
c := bondingCollector{} c := bondingCollector{}
if _, err := prometheus.RegisterOrGet(bondingSlaves); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(bondingSlavesActive); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
// Update reads and exposes bonding states, implements Collector interface. Caution: This works only on linux. // Update reads and exposes bonding states, implements Collector interface. Caution: This works only on linux.
func (c *bondingCollector) Update() (int, error) { func (c *bondingCollector) Update(ch chan<- prometheus.Metric) (err error) {
bondingStats, err := readBondingStats(sysfsNet) bondingStats, err := readBondingStats(sysfsNet)
if err != nil { if err != nil {
return 0, err return err
} }
updates := 0
for master, status := range bondingStats { for master, status := range bondingStats {
bondingSlaves.WithLabelValues(master).Set(float64(status[0])) bondingSlaves.WithLabelValues(master).Set(float64(status[0]))
updates++
bondingSlavesActive.WithLabelValues(master).Set(float64(status[1])) bondingSlavesActive.WithLabelValues(master).Set(float64(status[1]))
updates++
} }
return updates, nil bondingSlaves.Collect(ch)
bondingSlavesActive.Collect(ch)
return nil
} }
func readBondingStats(root string) (status map[string][2]int, err error) { func readBondingStats(root string) (status map[string][2]int, err error) {

View file

@ -1,6 +1,10 @@
// Exporter is a prometheus exporter using multiple Factories to collect and export system metrics. // Exporter is a prometheus exporter using multiple Factories to collect and export system metrics.
package collector package collector
import (
"github.com/prometheus/client_golang/prometheus"
)
const Namespace = "node" const Namespace = "node"
var Factories = make(map[string]func(Config) (Collector, error)) var Factories = make(map[string]func(Config) (Collector, error))
@ -8,7 +12,7 @@ var Factories = make(map[string]func(Config) (Collector, error))
// Interface a collector has to implement. // Interface a collector has to implement.
type Collector interface { type Collector interface {
// Get new metrics and expose them via prometheus registry. // Get new metrics and expose them via prometheus registry.
Update() (n int, err error) Update(ch chan<- prometheus.Metric) (err error)
} }
// TODO: Instead of periodically call Update, a Collector could be implemented // TODO: Instead of periodically call Update, a Collector could be implemented

View file

@ -146,19 +146,13 @@ func NewDiskstatsCollector(config Config) (Collector, error) {
config: config, config: config,
ignoredDevicesPattern: regexp.MustCompile(*ignoredDevices), ignoredDevicesPattern: regexp.MustCompile(*ignoredDevices),
} }
for _, c := range diskStatsMetrics {
if _, err := prometheus.RegisterOrGet(c); err != nil {
return nil, err
}
}
return &c, nil return &c, nil
} }
func (c *diskstatsCollector) Update() (updates int, err error) { func (c *diskstatsCollector) Update(ch chan<- prometheus.Metric) (err error) {
diskStats, err := getDiskStats() diskStats, err := getDiskStats()
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't get diskstats: %s", err) return fmt.Errorf("Couldn't get diskstats: %s", err)
} }
for dev, stats := range diskStats { for dev, stats := range diskStats {
if c.ignoredDevicesPattern.MatchString(dev) { if c.ignoredDevicesPattern.MatchString(dev) {
@ -166,10 +160,9 @@ func (c *diskstatsCollector) Update() (updates int, err error) {
continue continue
} }
for k, value := range stats { for k, value := range stats {
updates++
v, err := strconv.ParseFloat(value, 64) v, err := strconv.ParseFloat(value, 64)
if err != nil { if err != nil {
return updates, fmt.Errorf("Invalid value %s in diskstats: %s", value, err) return fmt.Errorf("Invalid value %s in diskstats: %s", value, err)
} }
counter, ok := diskStatsMetrics[k].(*prometheus.CounterVec) counter, ok := diskStatsMetrics[k].(*prometheus.CounterVec)
if ok { if ok {
@ -180,7 +173,10 @@ func (c *diskstatsCollector) Update() (updates int, err error) {
} }
} }
} }
return updates, err for _, c := range diskStatsMetrics {
c.Collect(ch)
}
return err
} }
func getDiskStats() (map[string]map[int]string, error) { func getDiskStats() (map[string]map[int]string, error) {

View file

@ -88,29 +88,14 @@ func NewFilesystemCollector(config Config) (Collector, error) {
config: config, config: config,
ignoredMountPointsPattern: regexp.MustCompile(*ignoredMountPoints), ignoredMountPointsPattern: regexp.MustCompile(*ignoredMountPoints),
} }
if _, err := prometheus.RegisterOrGet(fsSizeMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsFreeMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsAvailMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsFilesMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(fsFilesFreeMetric); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
// Expose filesystem fullness. // Expose filesystem fullness.
func (c *filesystemCollector) Update() (updates int, err error) { func (c *filesystemCollector) Update(ch chan<- prometheus.Metric) (err error) {
mps, err := mountPoints() mps, err := mountPoints()
if err != nil { if err != nil {
return updates, err return err
} }
for _, mp := range mps { for _, mp := range mps {
if c.ignoredMountPointsPattern.MatchString(mp) { if c.ignoredMountPointsPattern.MatchString(mp) {
@ -120,16 +105,20 @@ func (c *filesystemCollector) Update() (updates int, err error) {
buf := new(syscall.Statfs_t) buf := new(syscall.Statfs_t)
err := syscall.Statfs(mp, buf) err := syscall.Statfs(mp, buf)
if err != nil { if err != nil {
return updates, fmt.Errorf("Statfs on %s returned %s", mp, err) return fmt.Errorf("Statfs on %s returned %s", mp, err)
} }
fsSizeMetric.WithLabelValues(mp).Set(float64(buf.Blocks) * float64(buf.Bsize)) fsSizeMetric.WithLabelValues(mp).Set(float64(buf.Blocks) * float64(buf.Bsize))
fsFreeMetric.WithLabelValues(mp).Set(float64(buf.Bfree) * float64(buf.Bsize)) fsFreeMetric.WithLabelValues(mp).Set(float64(buf.Bfree) * float64(buf.Bsize))
fsAvailMetric.WithLabelValues(mp).Set(float64(buf.Bavail) * float64(buf.Bsize)) fsAvailMetric.WithLabelValues(mp).Set(float64(buf.Bavail) * float64(buf.Bsize))
fsFilesMetric.WithLabelValues(mp).Set(float64(buf.Files)) fsFilesMetric.WithLabelValues(mp).Set(float64(buf.Files))
fsFilesFreeMetric.WithLabelValues(mp).Set(float64(buf.Ffree)) fsFilesFreeMetric.WithLabelValues(mp).Set(float64(buf.Ffree))
updates++
} }
return updates, err fsSizeMetric.Collect(ch)
fsFreeMetric.Collect(ch)
fsAvailMetric.Collect(ch)
fsFilesMetric.Collect(ch)
fsFilesFreeMetric.Collect(ch)
return err
} }
func mountPoints() ([]string, error) { func mountPoints() ([]string, error) {

View file

@ -68,17 +68,16 @@ func (c *gmondCollector) setMetric(name, cluster string, metric ganglia.Metric)
}, },
[]string{"cluster"}, []string{"cluster"},
) )
c.Metrics[name] = prometheus.MustRegisterOrGet(gv).(*prometheus.GaugeVec)
} }
glog.V(1).Infof("Set %s{cluster=%q}: %f", name, cluster, metric.Value) glog.V(1).Infof("Set %s{cluster=%q}: %f", name, cluster, metric.Value)
c.Metrics[name].WithLabelValues(cluster).Set(metric.Value) c.Metrics[name].WithLabelValues(cluster).Set(metric.Value)
} }
func (c *gmondCollector) Update() (updates int, err error) { func (c *gmondCollector) Update(ch chan<- prometheus.Metric) (err error) {
conn, err := net.Dial(gangliaProto, gangliaAddress) conn, err := net.Dial(gangliaProto, gangliaAddress)
glog.V(1).Infof("gmondCollector Update") glog.V(1).Infof("gmondCollector Update")
if err != nil { if err != nil {
return updates, fmt.Errorf("Can't connect to gmond: %s", err) return fmt.Errorf("Can't connect to gmond: %s", err)
} }
conn.SetDeadline(time.Now().Add(gangliaTimeout)) conn.SetDeadline(time.Now().Add(gangliaTimeout))
@ -88,7 +87,7 @@ func (c *gmondCollector) Update() (updates int, err error) {
err = decoder.Decode(&ganglia) err = decoder.Decode(&ganglia)
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't parse xml: %s", err) return fmt.Errorf("Couldn't parse xml: %s", err)
} }
for _, cluster := range ganglia.Clusters { for _, cluster := range ganglia.Clusters {
@ -98,11 +97,13 @@ func (c *gmondCollector) Update() (updates int, err error) {
name := illegalCharsRE.ReplaceAllString(metric.Name, "_") name := illegalCharsRE.ReplaceAllString(metric.Name, "_")
c.setMetric(name, cluster.Name, metric) c.setMetric(name, cluster.Name, metric)
updates++
} }
} }
} }
return updates, err for _, m := range c.Metrics {
m.Collect(ch)
}
return err
} }
func toUtf8(charset string, input io.Reader) (io.Reader, error) { func toUtf8(charset string, input io.Reader) (io.Reader, error) {

View file

@ -42,23 +42,19 @@ func NewInterruptsCollector(config Config) (Collector, error) {
c := interruptsCollector{ c := interruptsCollector{
config: config, config: config,
} }
if _, err := prometheus.RegisterOrGet(interruptsMetric); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
func (c *interruptsCollector) Update() (updates int, err error) { func (c *interruptsCollector) Update(ch chan<- prometheus.Metric) (err error) {
interrupts, err := getInterrupts() interrupts, err := getInterrupts()
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't get interrupts: %s", err) return fmt.Errorf("Couldn't get interrupts: %s", err)
} }
for name, interrupt := range interrupts { for name, interrupt := range interrupts {
for cpuNo, value := range interrupt.values { for cpuNo, value := range interrupt.values {
updates++
fv, err := strconv.ParseFloat(value, 64) fv, err := strconv.ParseFloat(value, 64)
if err != nil { if err != nil {
return updates, fmt.Errorf("Invalid value %s in interrupts: %s", value, err) return fmt.Errorf("Invalid value %s in interrupts: %s", value, err)
} }
labels := prometheus.Labels{ labels := prometheus.Labels{
"CPU": strconv.Itoa(cpuNo), "CPU": strconv.Itoa(cpuNo),
@ -69,7 +65,8 @@ func (c *interruptsCollector) Update() (updates int, err error) {
interruptsMetric.With(labels).Set(fv) interruptsMetric.With(labels).Set(fv)
} }
} }
return updates, err interruptsMetric.Collect(ch)
return err
} }
type interrupt struct { type interrupt struct {

View file

@ -39,21 +39,18 @@ func NewLastLoginCollector(config Config) (Collector, error) {
c := lastLoginCollector{ c := lastLoginCollector{
config: config, config: config,
} }
if _, err := prometheus.RegisterOrGet(lastSeen); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
func (c *lastLoginCollector) Update() (updates int, err error) { func (c *lastLoginCollector) Update(ch chan<- prometheus.Metric) (err error) {
last, err := getLastLoginTime() last, err := getLastLoginTime()
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't get last seen: %s", err) return fmt.Errorf("Couldn't get last seen: %s", err)
} }
updates++
glog.V(1).Infof("Set node_last_login_time: %f", last) glog.V(1).Infof("Set node_last_login_time: %f", last)
lastSeen.Set(last) lastSeen.Set(last)
return updates, err lastSeen.Collect(ch)
return err
} }
func getLastLoginTime() (float64, error) { func getLastLoginTime() (float64, error) {

View file

@ -38,23 +38,18 @@ func NewLoadavgCollector(config Config) (Collector, error) {
c := loadavgCollector{ c := loadavgCollector{
config: config, config: config,
} }
if _, err := prometheus.RegisterOrGet(load1); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
func (c *loadavgCollector) Update() (updates int, err error) { func (c *loadavgCollector) Update(ch chan<- prometheus.Metric) (err error) {
load, err := getLoad1() load, err := getLoad1()
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't get load: %s", err) return fmt.Errorf("Couldn't get load: %s", err)
} }
updates++
glog.V(1).Infof("Set node_load: %f", load) glog.V(1).Infof("Set node_load: %f", load)
load1.Set(load) load1.Set(load)
load1.Collect(ch)
return updates, err return err
} }
func getLoad1() (float64, error) { func getLoad1() (float64, error) {

View file

@ -130,104 +130,92 @@ func NewMegaCliCollector(config Config) (Collector, error) {
config: config, config: config,
cli: cli, cli: cli,
} }
if _, err := prometheus.RegisterOrGet(driveTemperature); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(driveCounters); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(drivePresence); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
func (c *megaCliCollector) Update() (updates int, err error) { func (c *megaCliCollector) Update(ch chan<- prometheus.Metric) (err error) {
au, err := c.updateAdapter() err = c.updateAdapter()
if err != nil { if err != nil {
return au, err return err
} }
du, err := c.updateDisks() err = c.updateDisks()
return au + du, err driveTemperature.Collect(ch)
driveCounters.Collect(ch)
drivePresence.Collect(ch)
return err
} }
func (c *megaCliCollector) updateAdapter() (int, error) { func (c *megaCliCollector) updateAdapter() error {
cmd := exec.Command(c.cli, "-AdpAllInfo", "-aALL") cmd := exec.Command(c.cli, "-AdpAllInfo", "-aALL")
pipe, err := cmd.StdoutPipe() pipe, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return 0, err return err
} }
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return 0, err return err
} }
stats, err := parseMegaCliAdapter(pipe) stats, err := parseMegaCliAdapter(pipe)
if err != nil { if err != nil {
return 0, err return err
} }
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
return 0, err return err
} }
updates := 0
for k, v := range stats["Device Present"] { for k, v := range stats["Device Present"] {
value, err := strconv.ParseFloat(v, 64) value, err := strconv.ParseFloat(v, 64)
if err != nil { if err != nil {
return updates, err return err
} }
drivePresence.WithLabelValues(k).Set(value) drivePresence.WithLabelValues(k).Set(value)
updates++
} }
return updates, nil return nil
} }
func (c *megaCliCollector) updateDisks() (int, error) { func (c *megaCliCollector) updateDisks() error {
cmd := exec.Command(c.cli, "-PDList", "-aALL") cmd := exec.Command(c.cli, "-PDList", "-aALL")
pipe, err := cmd.StdoutPipe() pipe, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return 0, err return err
} }
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return 0, err return err
} }
stats, err := parseMegaCliDisks(pipe) stats, err := parseMegaCliDisks(pipe)
if err != nil { if err != nil {
return 0, err return err
} }
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
return 0, err return err
} }
updates := 0
for enc, encStats := range stats { for enc, encStats := range stats {
for slot, slotStats := range encStats { for slot, slotStats := range encStats {
tStr := slotStats["Drive Temperature"] tStr := slotStats["Drive Temperature"]
tStr = tStr[:strings.Index(tStr, "C")] tStr = tStr[:strings.Index(tStr, "C")]
t, err := strconv.ParseFloat(tStr, 64) t, err := strconv.ParseFloat(tStr, 64)
if err != nil { if err != nil {
return updates, err return err
} }
encStr := strconv.Itoa(enc) encStr := strconv.Itoa(enc)
slotStr := strconv.Itoa(slot) slotStr := strconv.Itoa(slot)
driveTemperature.WithLabelValues(encStr, slotStr).Set(t) driveTemperature.WithLabelValues(encStr, slotStr).Set(t)
updates++
for _, c := range counters { for _, c := range counters {
counter, err := strconv.ParseFloat(slotStats[c], 64) counter, err := strconv.ParseFloat(slotStats[c], 64)
if err != nil { if err != nil {
return updates, err return err
} }
driveCounters.WithLabelValues(encStr, slotStr, c).Set(counter) driveCounters.WithLabelValues(encStr, slotStr, c).Set(counter)
updates++
} }
} }
} }
return updates, nil return nil
} }

View file

@ -41,26 +41,25 @@ func NewMeminfoCollector(config Config) (Collector, error) {
return &c, nil return &c, nil
} }
func (c *meminfoCollector) Update() (updates int, err error) { func (c *meminfoCollector) Update(ch chan<- prometheus.Metric) (err error) {
memInfo, err := getMemInfo() memInfo, err := getMemInfo()
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't get meminfo: %s", err) return fmt.Errorf("Couldn't get meminfo: %s", err)
} }
glog.V(1).Infof("Set node_mem: %#v", memInfo) glog.V(1).Infof("Set node_mem: %#v", memInfo)
for k, v := range memInfo { for k, v := range memInfo {
if _, ok := memInfoMetrics[k]; !ok { if _, ok := memInfoMetrics[k]; !ok {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{ memInfoMetrics[k] = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: memInfoSubsystem, Subsystem: memInfoSubsystem,
Name: k, Name: k,
Help: k + " from /proc/meminfo.", Help: k + " from /proc/meminfo.",
}) })
memInfoMetrics[k] = prometheus.MustRegisterOrGet(gauge).(prometheus.Gauge)
} }
updates++
memInfoMetrics[k].Set(v) memInfoMetrics[k].Set(v)
memInfoMetrics[k].Collect(ch)
} }
return updates, err return err
} }
func getMemInfo() (map[string]float64, error) { func getMemInfo() (map[string]float64, error) {

View file

@ -39,17 +39,17 @@ func NewNetDevCollector(config Config) (Collector, error) {
return &c, nil return &c, nil
} }
func (c *netDevCollector) Update() (updates int, err error) { func (c *netDevCollector) Update(ch chan<- prometheus.Metric) (err error) {
netStats, err := getNetStats() netStats, err := getNetStats()
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't get netstats: %s", err) return fmt.Errorf("Couldn't get netstats: %s", err)
} }
for direction, devStats := range netStats { for direction, devStats := range netStats {
for dev, stats := range devStats { for dev, stats := range devStats {
for t, value := range stats { for t, value := range stats {
key := direction + "_" + t key := direction + "_" + t
if _, ok := netStatsMetrics[key]; !ok { if _, ok := netStatsMetrics[key]; !ok {
gv := prometheus.NewGaugeVec( netStatsMetrics[key] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: netStatsSubsystem, Subsystem: netStatsSubsystem,
@ -58,18 +58,19 @@ func (c *netDevCollector) Update() (updates int, err error) {
}, },
[]string{"device"}, []string{"device"},
) )
netStatsMetrics[key] = prometheus.MustRegisterOrGet(gv).(*prometheus.GaugeVec)
} }
updates++
v, err := strconv.ParseFloat(value, 64) v, err := strconv.ParseFloat(value, 64)
if err != nil { if err != nil {
return updates, fmt.Errorf("Invalid value %s in netstats: %s", value, err) return fmt.Errorf("Invalid value %s in netstats: %s", value, err)
} }
netStatsMetrics[key].WithLabelValues(dev).Set(v) netStatsMetrics[key].WithLabelValues(dev).Set(v)
} }
} }
} }
return updates, err for _, m := range netStatsMetrics {
m.Collect(ch)
}
return err
} }
func getNetStats() (map[string]map[string]map[string]string, error) { func getNetStats() (map[string]map[string]map[string]string, error) {

View file

@ -36,21 +36,17 @@ func NewNtpCollector(config Config) (Collector, error) {
} }
c := ntpCollector{} c := ntpCollector{}
if _, err := prometheus.RegisterOrGet(ntpDrift); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
func (c *ntpCollector) Update() (updates int, err error) { func (c *ntpCollector) Update(ch chan<- prometheus.Metric) (err error) {
t, err := ntp.Time(*ntpServer) t, err := ntp.Time(*ntpServer)
if err != nil { if err != nil {
return updates, fmt.Errorf("Couldn't get ntp drift: %s", err) return fmt.Errorf("Couldn't get ntp drift: %s", err)
} }
drift := t.Sub(time.Now()) drift := t.Sub(time.Now())
updates++
glog.V(1).Infof("Set ntp_drift_seconds: %f", drift.Seconds()) glog.V(1).Infof("Set ntp_drift_seconds: %f", drift.Seconds())
ntpDrift.Set(drift.Seconds()) ntpDrift.Set(drift.Seconds())
ntpDrift.Collect(ch)
return updates, err return err
} }

View file

@ -50,23 +50,13 @@ func NewRunitCollector(config Config) (Collector, error) {
config: config, config: config,
} }
if _, err := prometheus.RegisterOrGet(runitState); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(runitStateDesired); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(runitStateNormal); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
func (c *runitCollector) Update() (updates int, err error) { func (c *runitCollector) Update(ch chan<- prometheus.Metric) (err error) {
services, err := runit.GetServices("/etc/service") services, err := runit.GetServices("/etc/service")
if err != nil { if err != nil {
return 0, err return err
} }
for _, service := range services { for _, service := range services {
@ -84,8 +74,9 @@ func (c *runitCollector) Update() (updates int, err error) {
} else { } else {
runitStateNormal.WithLabelValues(service.Name).Set(1) runitStateNormal.WithLabelValues(service.Name).Set(1)
} }
updates += 3
} }
runitState.Collect(ch)
return updates, err runitStateDesired.Collect(ch)
runitStateNormal.Collect(ch)
return err
} }

View file

@ -73,35 +73,14 @@ func NewStatCollector(config Config) (Collector, error) {
c := statCollector{ c := statCollector{
config: config, config: config,
} }
if _, err := prometheus.RegisterOrGet(cpuMetrics); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(intrMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(ctxtMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(forksMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(btimeMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(procsRunningMetric); err != nil {
return nil, err
}
if _, err := prometheus.RegisterOrGet(procsBlockedMetric); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
// Expose a variety of stats from /proc/stats. // Expose a variety of stats from /proc/stats.
func (c *statCollector) Update() (updates int, err error) { func (c *statCollector) Update(ch chan<- prometheus.Metric) (err error) {
file, err := os.Open(procStat) file, err := os.Open(procStat)
if err != nil { if err != nil {
return updates, err return err
} }
defer file.Close() defer file.Close()
@ -119,7 +98,7 @@ func (c *statCollector) Update() (updates int, err error) {
for i, v := range parts[1 : len(cpuFields)+1] { for i, v := range parts[1 : len(cpuFields)+1] {
value, err := strconv.ParseFloat(v, 64) value, err := strconv.ParseFloat(v, 64)
if err != nil { if err != nil {
return updates, err return err
} }
// Convert from ticks to seconds // Convert from ticks to seconds
value /= float64(C.sysconf(C._SC_CLK_TCK)) value /= float64(C.sysconf(C._SC_CLK_TCK))
@ -129,40 +108,47 @@ func (c *statCollector) Update() (updates int, err error) {
// Only expose the overall number, use the 'interrupts' collector for more detail. // Only expose the overall number, use the 'interrupts' collector for more detail.
value, err := strconv.ParseFloat(parts[1], 64) value, err := strconv.ParseFloat(parts[1], 64)
if err != nil { if err != nil {
return updates, err return err
} }
intrMetric.Set(value) intrMetric.Set(value)
case parts[0] == "ctxt": case parts[0] == "ctxt":
value, err := strconv.ParseFloat(parts[1], 64) value, err := strconv.ParseFloat(parts[1], 64)
if err != nil { if err != nil {
return updates, err return err
} }
ctxtMetric.Set(value) ctxtMetric.Set(value)
case parts[0] == "processes": case parts[0] == "processes":
value, err := strconv.ParseFloat(parts[1], 64) value, err := strconv.ParseFloat(parts[1], 64)
if err != nil { if err != nil {
return updates, err return err
} }
forksMetric.Set(value) forksMetric.Set(value)
case parts[0] == "btime": case parts[0] == "btime":
value, err := strconv.ParseFloat(parts[1], 64) value, err := strconv.ParseFloat(parts[1], 64)
if err != nil { if err != nil {
return updates, err return err
} }
btimeMetric.Set(value) btimeMetric.Set(value)
case parts[0] == "procs_running": case parts[0] == "procs_running":
value, err := strconv.ParseFloat(parts[1], 64) value, err := strconv.ParseFloat(parts[1], 64)
if err != nil { if err != nil {
return updates, err return err
} }
procsRunningMetric.Set(value) procsRunningMetric.Set(value)
case parts[0] == "procs_blocked": case parts[0] == "procs_blocked":
value, err := strconv.ParseFloat(parts[1], 64) value, err := strconv.ParseFloat(parts[1], 64)
if err != nil { if err != nil {
return updates, err return err
} }
procsBlockedMetric.Set(value) procsBlockedMetric.Set(value)
} }
} }
return updates, err cpuMetrics.Collect(ch)
ctxtMetric.Collect(ch)
intrMetric.Collect(ch)
forksMetric.Collect(ch)
btimeMetric.Collect(ch)
procsRunningMetric.Collect(ch)
procsBlockedMetric.Collect(ch)
return err
} }

View file

@ -32,16 +32,13 @@ func NewTimeCollector(config Config) (Collector, error) {
config: config, config: config,
} }
if _, err := prometheus.RegisterOrGet(systemTime); err != nil {
return nil, err
}
return &c, nil return &c, nil
} }
func (c *timeCollector) Update() (updates int, err error) { func (c *timeCollector) Update(ch chan<- prometheus.Metric) (err error) {
updates++
now := time.Now() now := time.Now()
glog.V(1).Infof("Set time: %f", now.Unix()) glog.V(1).Infof("Set time: %f", now.Unix())
systemTime.Set(float64(now.Unix())) systemTime.Set(float64(now.Unix()))
return updates, err systemTime.Collect(ch)
return err
} }

View file

@ -28,7 +28,6 @@ var (
listeningAddress = flag.String("listen", ":8080", "address to listen on") listeningAddress = flag.String("listen", ":8080", "address to listen on")
enabledCollectors = flag.String("enabledCollectors", "attributes,diskstats,filesystem,loadavg,meminfo,stat,time,netdev", "comma-seperated list of collectors to use") enabledCollectors = flag.String("enabledCollectors", "attributes,diskstats,filesystem,loadavg,meminfo,stat,time,netdev", "comma-seperated list of collectors to use")
printCollectors = flag.Bool("printCollectors", false, "If true, print available collectors and exit") printCollectors = flag.Bool("printCollectors", false, "If true, print available collectors and exit")
interval = flag.Duration("interval", 60*time.Second, "refresh interval")
collectorLabelNames = []string{"collector", "result"} collectorLabelNames = []string{"collector", "result"}
@ -41,78 +40,56 @@ var (
}, },
collectorLabelNames, collectorLabelNames,
) )
metricsUpdated = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: collector.Namespace,
Subsystem: subsystem,
Name: "metrics_updated",
Help: "node_exporter: Number of metrics updated.",
},
collectorLabelNames,
)
) )
func main() { // Implements Collector.
flag.Parse() type NodeCollector struct {
if *printCollectors { collectors map[string]collector.Collector
fmt.Printf("Available collectors:\n") }
for n, _ := range collector.Factories {
fmt.Printf(" - %s\n", n) // Implements Collector.
} func (n NodeCollector) Describe(ch chan<- *prometheus.Desc) {
return scrapeDurations.Describe(ch)
}
// Implements Collector.
func (n NodeCollector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(n.collectors))
for name, c := range n.collectors {
go func(name string, c collector.Collector) {
Execute(name, c, ch)
wg.Done()
}(name, c)
} }
collectors, err := loadCollectors(*configFile) wg.Wait()
scrapeDurations.Collect(ch)
}
func Execute(name string, c collector.Collector, ch chan<- prometheus.Metric) {
begin := time.Now()
err := c.Update(ch)
duration := time.Since(begin)
var result string
if err != nil { if err != nil {
log.Fatalf("Couldn't load config and collectors: %s", err) glog.Infof("ERROR: %s failed after %fs: %s", name, duration.Seconds(), err)
result = "error"
} else {
glog.Infof("OK: %s success after %fs.", name, duration.Seconds())
result = "success"
} }
scrapeDurations.WithLabelValues(name, result).Observe(duration.Seconds())
}
prometheus.MustRegister(scrapeDurations) func getConfig(file string) (*collector.Config, error) {
prometheus.MustRegister(metricsUpdated) config := &collector.Config{}
glog.Infof("Reading config %s", *configFile)
glog.Infof("Enabled collectors:") bytes, err := ioutil.ReadFile(*configFile)
for n, _ := range collectors { if err != nil {
glog.Infof(" - %s", n) return nil, err
} }
return config, json.Unmarshal(bytes, &config)
sigHup := make(chan os.Signal)
sigUsr1 := make(chan os.Signal)
signal.Notify(sigHup, syscall.SIGHUP)
signal.Notify(sigUsr1, syscall.SIGUSR1)
go serveStatus()
glog.Infof("Starting initial collection")
collect(collectors)
tick := time.Tick(*interval)
for {
select {
case <-sigHup:
collectors, err = loadCollectors(*configFile)
if err != nil {
log.Fatalf("Couldn't load config and collectors: %s", err)
}
glog.Infof("Reloaded collectors and config")
tick = time.Tick(*interval)
case <-tick:
glog.Infof("Starting new interval")
collect(collectors)
case <-sigUsr1:
glog.Infof("got signal")
if *memProfile != "" {
glog.Infof("Writing memory profile to %s", *memProfile)
f, err := os.Create(*memProfile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
}
}
} }
func loadCollectors(file string) (map[string]collector.Collector, error) { func loadCollectors(file string) (map[string]collector.Collector, error) {
@ -135,46 +112,50 @@ func loadCollectors(file string) (map[string]collector.Collector, error) {
return collectors, nil return collectors, nil
} }
func getConfig(file string) (*collector.Config, error) { func main() {
config := &collector.Config{} flag.Parse()
glog.Infof("Reading config %s", *configFile) if *printCollectors {
bytes, err := ioutil.ReadFile(*configFile) fmt.Printf("Available collectors:\n")
for n, _ := range collector.Factories {
fmt.Printf(" - %s\n", n)
}
return
}
collectors, err := loadCollectors(*configFile)
if err != nil { if err != nil {
return nil, err log.Fatalf("Couldn't load config and collectors: %s", err)
} }
return config, json.Unmarshal(bytes, &config)
}
func serveStatus() { glog.Infof("Enabled collectors:")
http.Handle("/metrics", prometheus.Handler()) for n, _ := range collectors {
http.ListenAndServe(*listeningAddress, nil) glog.Infof(" - %s", n)
}
func collect(collectors map[string]collector.Collector) {
wg := sync.WaitGroup{}
wg.Add(len(collectors))
for n, c := range collectors {
go func(n string, c collector.Collector) {
Execute(n, c)
wg.Done()
}(n, c)
} }
wg.Wait()
}
func Execute(name string, c collector.Collector) { nodeCollector := NodeCollector{collectors: collectors}
begin := time.Now() prometheus.MustRegister(nodeCollector)
updates, err := c.Update()
duration := time.Since(begin)
var result string
if err != nil { sigUsr1 := make(chan os.Signal)
glog.Infof("ERROR: %s failed after %fs: %s", name, duration.Seconds(), err) signal.Notify(sigUsr1, syscall.SIGUSR1)
result = "error"
} else { go func() {
glog.Infof("OK: %s success after %fs.", name, duration.Seconds()) http.Handle("/metrics", prometheus.Handler())
result = "success" http.ListenAndServe(*listeningAddress, nil)
}()
for {
select {
case <-sigUsr1:
glog.Infof("got signal")
if *memProfile != "" {
glog.Infof("Writing memory profile to %s", *memProfile)
f, err := os.Create(*memProfile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
}
} }
scrapeDurations.WithLabelValues(name, result).Observe(duration.Seconds())
metricsUpdated.WithLabelValues(name, result).Set(float64(updates))
} }