mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-24 04:02:04 -08:00
Migrate to new client_golang.
This change will only be submitted when the new client_golang has been moved to the new version. Change-Id: Ifceb59333072a08286a8ac910709a8ba2e3a1581
This commit is contained in:
parent
8da3429e45
commit
2128d9d811
6
main.go
6
main.go
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
registry "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -244,13 +245,15 @@ func main() {
|
|||
if err != nil {
|
||||
glog.Fatal("Error opening storage: ", err)
|
||||
}
|
||||
registry.MustRegister(ts)
|
||||
|
||||
var remoteTSDBQueue *remote.TSDBQueueManager = nil
|
||||
var remoteTSDBQueue *remote.TSDBQueueManager
|
||||
if *remoteTSDBUrl == "" {
|
||||
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
|
||||
} else {
|
||||
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
|
||||
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512)
|
||||
registry.MustRegister(remoteTSDBQueue)
|
||||
go remoteTSDBQueue.Run()
|
||||
}
|
||||
|
||||
|
@ -285,6 +288,7 @@ func main() {
|
|||
go ruleManager.Run()
|
||||
|
||||
notificationHandler := notification.NewNotificationHandler(*alertmanagerUrl, notifications)
|
||||
registry.MustRegister(notificationHandler)
|
||||
go notificationHandler.Run()
|
||||
|
||||
flags := map[string]string{}
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package notification
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
result = "result"
|
||||
success = "success"
|
||||
failure = "failure"
|
||||
dropped = "dropped"
|
||||
|
||||
facet = "facet"
|
||||
occupancy = "occupancy"
|
||||
capacity = "capacity"
|
||||
)
|
||||
|
||||
var (
|
||||
notificationsCount = prometheus.NewCounter()
|
||||
notificationLatency = prometheus.NewDefaultHistogram()
|
||||
notificationsQueueSize = prometheus.NewGauge()
|
||||
)
|
||||
|
||||
func recordOutcome(duration time.Duration, err error) {
|
||||
labels := map[string]string{result: success}
|
||||
if err != nil {
|
||||
labels[result] = failure
|
||||
}
|
||||
|
||||
notificationsCount.Increment(labels)
|
||||
ms := float64(duration / time.Millisecond)
|
||||
notificationLatency.Add(labels, ms)
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.Register("prometheus_notifications_total", "Total number of processed alert notifications.", prometheus.NilLabels, notificationsCount)
|
||||
prometheus.Register("prometheus_notifications_latency_ms", "Latency quantiles for sending alert notifications in milliseconds.", prometheus.NilLabels, notificationLatency)
|
||||
prometheus.Register("prometheus_notifications_queue_size_total", "The size and capacity of the alert notification queue.", prometheus.NilLabels, notificationsQueueSize)
|
||||
}
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
)
|
||||
|
@ -34,6 +35,18 @@ const (
|
|||
contentTypeJson = "application/json"
|
||||
)
|
||||
|
||||
// String constants for instrumentation.
|
||||
const (
|
||||
result = "result"
|
||||
success = "success"
|
||||
failure = "failure"
|
||||
dropped = "dropped"
|
||||
|
||||
facet = "facet"
|
||||
occupancy = "occupancy"
|
||||
capacity = "capacity"
|
||||
)
|
||||
|
||||
var (
|
||||
deadline = flag.Duration("alertmanager.httpDeadline", 10*time.Second, "Alert manager HTTP API timeout.")
|
||||
)
|
||||
|
@ -72,6 +85,9 @@ type NotificationHandler struct {
|
|||
pendingNotifications <-chan NotificationReqs
|
||||
// HTTP client with custom timeout settings.
|
||||
httpClient httpPoster
|
||||
|
||||
notificationLatency *prometheus.SummaryVec
|
||||
notificationsQueueSize *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
// Construct a new NotificationHandler.
|
||||
|
@ -80,6 +96,21 @@ func NewNotificationHandler(alertmanagerUrl string, notificationReqs <-chan Noti
|
|||
alertmanagerUrl: alertmanagerUrl,
|
||||
pendingNotifications: notificationReqs,
|
||||
httpClient: utility.NewDeadlineClient(*deadline),
|
||||
|
||||
notificationLatency: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_notifications_latency_ms",
|
||||
Help: "Latency quantiles for sending alert notifications in milliseconds.",
|
||||
},
|
||||
[]string{result},
|
||||
),
|
||||
notificationsQueueSize: prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "prometheus_notifications_queue_size_total",
|
||||
Help: "The size and capacity of the alert notification queue.",
|
||||
},
|
||||
[]string{facet},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,35 +153,40 @@ func (n *NotificationHandler) sendNotifications(reqs NotificationReqs) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Report notification queue occupancy and capacity.
|
||||
func (n *NotificationHandler) reportQueues() {
|
||||
notificationsQueueSize.Set(map[string]string{facet: occupancy}, float64(len(n.pendingNotifications)))
|
||||
notificationsQueueSize.Set(map[string]string{facet: capacity}, float64(cap(n.pendingNotifications)))
|
||||
}
|
||||
|
||||
// Continuously dispatch notifications.
|
||||
func (n *NotificationHandler) Run() {
|
||||
queueReportTicker := time.NewTicker(time.Second)
|
||||
go func() {
|
||||
for _ = range queueReportTicker.C {
|
||||
n.reportQueues()
|
||||
}
|
||||
}()
|
||||
defer queueReportTicker.Stop()
|
||||
|
||||
for reqs := range n.pendingNotifications {
|
||||
if n.alertmanagerUrl == "" {
|
||||
glog.Warning("No alert manager configured, not dispatching notification")
|
||||
notificationsCount.Increment(map[string]string{result: dropped})
|
||||
n.notificationLatency.WithLabelValues(dropped).Observe(0)
|
||||
continue
|
||||
}
|
||||
|
||||
begin := time.Now()
|
||||
err := n.sendNotifications(reqs)
|
||||
recordOutcome(time.Since(begin), err)
|
||||
labelValue := success
|
||||
|
||||
if err != nil {
|
||||
glog.Error("Error sending notification: ", err)
|
||||
labelValue = failure
|
||||
}
|
||||
|
||||
n.notificationLatency.WithLabelValues(labelValue).Observe(
|
||||
float64(time.Since(begin) / time.Millisecond),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (n *NotificationHandler) Describe(ch chan<- *prometheus.Desc) {
|
||||
n.notificationLatency.Describe(ch)
|
||||
n.notificationsQueueSize.Describe(ch)
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (n *NotificationHandler) Collect(ch chan<- prometheus.Metric) {
|
||||
n.notificationLatency.Collect(ch)
|
||||
n.notificationsQueueSize.WithLabelValues(occupancy).Set(float64(len(n.pendingNotifications)))
|
||||
n.notificationsQueueSize.WithLabelValues(capacity).Set(float64(cap(n.pendingNotifications)))
|
||||
n.notificationsQueueSize.Collect(ch)
|
||||
}
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package retrieval
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
address = "instance"
|
||||
alive = "alive"
|
||||
failure = "failure"
|
||||
outcome = "outcome"
|
||||
state = "state"
|
||||
success = "success"
|
||||
unreachable = "unreachable"
|
||||
)
|
||||
|
||||
var (
|
||||
networkLatencyHistogram = &prometheus.HistogramSpecification{
|
||||
Starts: prometheus.LogarithmicSizedBucketsFor(0, 1000),
|
||||
BucketBuilder: prometheus.AccumulatingBucketBuilder(prometheus.EvictAndReplaceWith(10, prometheus.AverageReducer), 100),
|
||||
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
|
||||
}
|
||||
|
||||
targetOperationLatencies = prometheus.NewHistogram(networkLatencyHistogram)
|
||||
|
||||
retrievalDurations = prometheus.NewHistogram(&prometheus.HistogramSpecification{
|
||||
Starts: prometheus.LogarithmicSizedBucketsFor(0, 10000),
|
||||
BucketBuilder: prometheus.AccumulatingBucketBuilder(prometheus.EvictAndReplaceWith(10, prometheus.AverageReducer), 100),
|
||||
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}})
|
||||
|
||||
targetOperations = prometheus.NewCounter()
|
||||
dnsSDLookupsCount = prometheus.NewCounter()
|
||||
)
|
||||
|
||||
func recordOutcome(err error) {
|
||||
message := success
|
||||
if err != nil {
|
||||
message = failure
|
||||
}
|
||||
dnsSDLookupsCount.Increment(map[string]string{outcome: message})
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.Register("prometheus_target_operations_total", "The total numbers of operations of the various targets that are being monitored.", prometheus.NilLabels, targetOperations)
|
||||
prometheus.Register("prometheus_target_operation_latency_ms", "The latencies for various target operations.", prometheus.NilLabels, targetOperationLatencies)
|
||||
prometheus.Register("prometheus_targetpool_duration_ms", "The durations for each TargetPool to retrieve state from all included entities.", prometheus.NilLabels, retrievalDurations)
|
||||
prometheus.Register("prometheus_dns_sd_lookups_total", "The number of DNS-SD lookup successes/failures per pool.", prometheus.NilLabels, dnsSDLookupsCount)
|
||||
}
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -33,9 +34,32 @@ const (
|
|||
InstanceLabel clientmodel.LabelName = "instance"
|
||||
// The metric name for the synthetic health variable.
|
||||
ScrapeHealthMetricName clientmodel.LabelValue = "up"
|
||||
|
||||
// Constants for instrumentation.
|
||||
address = "instance"
|
||||
alive = "alive"
|
||||
failure = "failure"
|
||||
outcome = "outcome"
|
||||
state = "state"
|
||||
success = "success"
|
||||
)
|
||||
|
||||
var localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"}
|
||||
var (
|
||||
localhostRepresentations = []string{"http://127.0.0.1", "http://localhost"}
|
||||
|
||||
targetOperationLatencies = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_target_operation_latency_ms",
|
||||
Help: "The latencies for various target operations.",
|
||||
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
|
||||
},
|
||||
[]string{address, outcome},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(targetOperationLatencies)
|
||||
}
|
||||
|
||||
// The state of the given Target.
|
||||
type TargetState int
|
||||
|
@ -205,13 +229,12 @@ const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client
|
|||
func (t *target) scrape(timestamp clientmodel.Timestamp, ingester extraction.Ingester) (err error) {
|
||||
defer func(start time.Time) {
|
||||
ms := float64(time.Since(start)) / float64(time.Millisecond)
|
||||
labels := map[string]string{address: t.Address(), outcome: success}
|
||||
labels := prometheus.Labels{address: t.Address(), outcome: success}
|
||||
if err != nil {
|
||||
labels[outcome] = failure
|
||||
}
|
||||
|
||||
targetOperationLatencies.Add(labels, ms)
|
||||
targetOperations.Increment(labels)
|
||||
targetOperationLatencies.With(labels).Observe(ms)
|
||||
}(time.Now())
|
||||
|
||||
req, err := http.NewRequest("GET", t.Address(), nil)
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"github.com/miekg/dns"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -31,6 +32,20 @@ import (
|
|||
|
||||
const resolvConf = "/etc/resolv.conf"
|
||||
|
||||
var (
|
||||
dnsSDLookupsCount = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_dns_sd_lookups_total",
|
||||
Help: "The number of DNS-SD lookup successes/failures per pool.",
|
||||
},
|
||||
[]string{outcome},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(dnsSDLookupsCount)
|
||||
}
|
||||
|
||||
// TargetProvider encapsulates retrieving all targets for a job.
|
||||
type TargetProvider interface {
|
||||
// Retrieves the current list of targets for this provider.
|
||||
|
@ -60,7 +75,13 @@ func NewSdTargetProvider(job config.JobConfig) *sdTargetProvider {
|
|||
|
||||
func (p *sdTargetProvider) Targets() ([]Target, error) {
|
||||
var err error
|
||||
defer func() { recordOutcome(err) }()
|
||||
defer func() {
|
||||
message := success
|
||||
if err != nil {
|
||||
message = failure
|
||||
}
|
||||
dnsSDLookupsCount.WithLabelValues(message).Inc()
|
||||
}()
|
||||
|
||||
if time.Since(p.lastRefresh) < p.refreshInterval {
|
||||
return p.targets, nil
|
||||
|
|
|
@ -20,15 +20,31 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
intervalKey = "interval"
|
||||
|
||||
targetAddQueueSize = 100
|
||||
targetReplaceQueueSize = 1
|
||||
|
||||
intervalKey = "interval"
|
||||
)
|
||||
|
||||
var (
|
||||
retrievalDurations = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_targetpool_duration_ms",
|
||||
Help: "The durations for each TargetPool to retrieve state from all included entities.",
|
||||
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
|
||||
},
|
||||
[]string{intervalKey},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(retrievalDurations)
|
||||
}
|
||||
|
||||
type TargetPool struct {
|
||||
sync.RWMutex
|
||||
|
||||
|
@ -164,7 +180,7 @@ func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Du
|
|||
wait.Wait()
|
||||
|
||||
duration := float64(time.Since(begin) / time.Millisecond)
|
||||
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
|
||||
retrievalDurations.WithLabelValues(interval.String()).Observe(duration)
|
||||
}
|
||||
|
||||
func (p *TargetPool) Targets() []Target {
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -30,6 +31,37 @@ import (
|
|||
"github.com/prometheus/prometheus/templates"
|
||||
)
|
||||
|
||||
// Constants for instrumentation.
|
||||
const (
|
||||
intervalLabel = "interval"
|
||||
ruleTypeLabel = "rule_type"
|
||||
alertingRuleType = "alerting"
|
||||
recordingRuleType = "recording"
|
||||
)
|
||||
|
||||
var (
|
||||
evalDuration = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_rule_evaluation_duration_ms",
|
||||
Help: "The duration for a rule to execute.",
|
||||
},
|
||||
[]string{ruleTypeLabel},
|
||||
)
|
||||
iterationDuration = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_evaluator_duration_ms",
|
||||
Help: "The duration for each evaluation pool to execute.",
|
||||
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
|
||||
},
|
||||
[]string{intervalLabel},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(iterationDuration)
|
||||
prometheus.MustRegister(evalDuration)
|
||||
}
|
||||
|
||||
type RuleManager interface {
|
||||
// Load and add rules from rule files specified in the configuration.
|
||||
AddRulesFromConfig(config config.Config) error
|
||||
|
@ -92,7 +124,7 @@ func (m *ruleManager) Run() {
|
|||
case <-ticker.C:
|
||||
start := time.Now()
|
||||
m.runIteration(m.results)
|
||||
iterationDuration.Add(map[string]string{intervalLabel: m.interval.String()}, float64(time.Since(start)/time.Millisecond))
|
||||
iterationDuration.WithLabelValues(m.interval.String()).Observe(float64(time.Since(start) / time.Millisecond))
|
||||
case <-m.done:
|
||||
glog.Info("rules.Rule manager exiting...")
|
||||
return
|
||||
|
@ -190,9 +222,13 @@ func (m *ruleManager) runIteration(results chan<- *extraction.Result) {
|
|||
switch r := rule.(type) {
|
||||
case *rules.AlertingRule:
|
||||
m.queueAlertNotifications(r, now)
|
||||
recordOutcome(alertingRuleType, duration)
|
||||
evalDuration.WithLabelValues(alertingRuleType).Observe(
|
||||
float64(duration / time.Millisecond),
|
||||
)
|
||||
case *rules.RecordingRule:
|
||||
recordOutcome(recordingRuleType, duration)
|
||||
evalDuration.WithLabelValues(recordingRuleType).Observe(
|
||||
float64(duration / time.Millisecond),
|
||||
)
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown rule type: %T", rule))
|
||||
}
|
||||
|
|
|
@ -1,48 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package manager
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
intervalLabel = "interval"
|
||||
ruleTypeLabel = "rule_type"
|
||||
alertingRuleType = "alerting"
|
||||
recordingRuleType = "recording"
|
||||
)
|
||||
|
||||
var (
|
||||
evalDuration = prometheus.NewDefaultHistogram()
|
||||
evalCount = prometheus.NewCounter()
|
||||
iterationDuration = prometheus.NewHistogram(&prometheus.HistogramSpecification{
|
||||
Starts: prometheus.LogarithmicSizedBucketsFor(0, 10000),
|
||||
BucketBuilder: prometheus.AccumulatingBucketBuilder(prometheus.EvictAndReplaceWith(10, prometheus.AverageReducer), 100),
|
||||
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99}})
|
||||
)
|
||||
|
||||
func recordOutcome(ruleType string, duration time.Duration) {
|
||||
millisecondDuration := float64(duration / time.Millisecond)
|
||||
evalCount.Increment(map[string]string{ruleTypeLabel: ruleType})
|
||||
evalDuration.Add(map[string]string{ruleTypeLabel: ruleType}, millisecondDuration)
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.Register("prometheus_evaluator_duration_ms", "The duration for each evaluation pool to execute.", prometheus.NilLabels, iterationDuration)
|
||||
prometheus.Register("prometheus_rule_evaluation_duration_ms", "The duration for a rule to execute.", prometheus.NilLabels, evalDuration)
|
||||
prometheus.Register("prometheus_rule_evaluation_count", "The number of rules evaluated.", prometheus.NilLabels, evalCount)
|
||||
}
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
|
@ -37,6 +38,35 @@ const curationYieldPeriod = 250 * time.Millisecond
|
|||
|
||||
var errIllegalIterator = errors.New("iterator invalid")
|
||||
|
||||
// Constants for instrumentation.
|
||||
const (
|
||||
cutOff = "recency_threshold"
|
||||
processorName = "processor"
|
||||
)
|
||||
|
||||
var (
|
||||
curationDurations = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_curation_durations_ms",
|
||||
Help: "Histogram of time spent in curation (ms).",
|
||||
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
|
||||
},
|
||||
[]string{cutOff, processorName, result},
|
||||
)
|
||||
curationFilterOperations = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_curation_filter_operations_total",
|
||||
Help: "The number of curation filter operations completed.",
|
||||
},
|
||||
[]string{cutOff, processorName, result},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(curationDurations)
|
||||
prometheus.MustRegister(curationFilterOperations)
|
||||
}
|
||||
|
||||
// CurationStateUpdater receives updates about the curation state.
|
||||
type CurationStateUpdater interface {
|
||||
UpdateCurationState(*metric.CurationState)
|
||||
|
@ -123,7 +153,7 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times
|
|||
defer func(t time.Time) {
|
||||
duration := float64(time.Since(t) / time.Millisecond)
|
||||
|
||||
labels := map[string]string{
|
||||
labels := prometheus.Labels{
|
||||
cutOff: fmt.Sprint(ignoreYoungerThan),
|
||||
processorName: processor.Name(),
|
||||
result: success,
|
||||
|
@ -132,8 +162,7 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times
|
|||
labels[result] = failure
|
||||
}
|
||||
|
||||
curationDuration.IncrementBy(labels, duration)
|
||||
curationDurations.Add(labels, duration)
|
||||
curationDurations.With(labels).Observe(duration)
|
||||
}(time.Now())
|
||||
|
||||
defer status.UpdateCurationState(&metric.CurationState{Active: false})
|
||||
|
@ -255,13 +284,13 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
|
|||
fingerprint := key.(*clientmodel.Fingerprint)
|
||||
|
||||
defer func() {
|
||||
labels := map[string]string{
|
||||
labels := prometheus.Labels{
|
||||
cutOff: fmt.Sprint(w.ignoreYoungerThan),
|
||||
result: strings.ToLower(r.String()),
|
||||
processorName: w.processor.Name(),
|
||||
}
|
||||
|
||||
curationFilterOperations.Increment(labels)
|
||||
curationFilterOperations.With(labels).Inc()
|
||||
|
||||
w.status.UpdateCurationState(&metric.CurationState{
|
||||
Active: true,
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tiered
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
operation = "operation"
|
||||
success = "success"
|
||||
failure = "failure"
|
||||
result = "result"
|
||||
|
||||
appendSample = "append_sample"
|
||||
appendSamples = "append_samples"
|
||||
flushMemory = "flush_memory"
|
||||
getLabelValuesForLabelName = "get_label_values_for_label_name"
|
||||
getFingerprintsForLabelMatchers = "get_fingerprints_for_label_matchers"
|
||||
getMetricForFingerprint = "get_metric_for_fingerprint"
|
||||
hasIndexMetric = "has_index_metric"
|
||||
refreshHighWatermarks = "refresh_high_watermarks"
|
||||
renderView = "render_view"
|
||||
|
||||
cutOff = "recency_threshold"
|
||||
processorName = "processor"
|
||||
)
|
||||
|
||||
var (
|
||||
diskLatencyHistogram = &prometheus.HistogramSpecification{
|
||||
Starts: prometheus.LogarithmicSizedBucketsFor(0, 5000),
|
||||
BucketBuilder: prometheus.AccumulatingBucketBuilder(prometheus.EvictAndReplaceWith(10, prometheus.AverageReducer), 100),
|
||||
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
|
||||
}
|
||||
|
||||
curationDuration = prometheus.NewCounter()
|
||||
curationDurations = prometheus.NewHistogram(diskLatencyHistogram)
|
||||
curationFilterOperations = prometheus.NewCounter()
|
||||
storageOperations = prometheus.NewCounter()
|
||||
storageOperationDurations = prometheus.NewCounter()
|
||||
storageLatency = prometheus.NewHistogram(diskLatencyHistogram)
|
||||
queueSizes = prometheus.NewGauge()
|
||||
storedSamplesCount = prometheus.NewCounter()
|
||||
)
|
||||
|
||||
func recordOutcome(duration time.Duration, err error, success, failure map[string]string) {
|
||||
labels := success
|
||||
if err != nil {
|
||||
labels = failure
|
||||
}
|
||||
|
||||
storageOperations.Increment(labels)
|
||||
asFloat := float64(duration / time.Microsecond)
|
||||
storageLatency.Add(labels, asFloat)
|
||||
storageOperationDurations.IncrementBy(labels, asFloat)
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.Register("prometheus_metric_disk_operations_total", "Total number of metric-related disk operations.", prometheus.NilLabels, storageOperations)
|
||||
prometheus.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", prometheus.NilLabels, storageLatency)
|
||||
prometheus.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", prometheus.NilLabels, storageOperationDurations)
|
||||
prometheus.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", prometheus.NilLabels, queueSizes)
|
||||
prometheus.Register("prometheus_curation_filter_operations_total", "The number of curation filter operations completed.", prometheus.NilLabels, curationFilterOperations)
|
||||
prometheus.Register("prometheus_curation_duration_ms_total", "The total time spent in curation (ms).", prometheus.NilLabels, curationDuration)
|
||||
prometheus.Register("prometheus_curation_durations_ms", "Histogram of time spent in curation (ms).", prometheus.NilLabels, curationDurations)
|
||||
prometheus.Register("prometheus_stored_samples_total", "The number of samples that have been stored.", prometheus.NilLabels, storedSamplesCount)
|
||||
}
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/metric"
|
||||
|
@ -249,9 +250,7 @@ func NewLevelDBPersistence(baseDirectory string) (*LevelDBPersistence, error) {
|
|||
// AppendSample implements the Persistence interface.
|
||||
func (l *LevelDBPersistence) AppendSample(sample *clientmodel.Sample) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure})
|
||||
recordOutcome(time.Since(begin), err, appendSample)
|
||||
}(time.Now())
|
||||
|
||||
err = l.AppendSamples(clientmodel.Samples{sample})
|
||||
|
@ -295,9 +294,7 @@ func groupByFingerprint(samples clientmodel.Samples) map[clientmodel.Fingerprint
|
|||
|
||||
func (l *LevelDBPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure})
|
||||
recordOutcome(time.Since(begin), err, refreshHighWatermarks)
|
||||
}(time.Now())
|
||||
|
||||
b := FingerprintHighWatermarkMapping{}
|
||||
|
@ -315,9 +312,7 @@ func (l *LevelDBPersistence) refreshHighWatermarks(groups map[clientmodel.Finger
|
|||
// AppendSamples appends the given Samples to the database and indexes them.
|
||||
func (l *LevelDBPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
|
||||
recordOutcome(time.Since(begin), err, appendSamples)
|
||||
}(time.Now())
|
||||
|
||||
fingerprintToSamples := groupByFingerprint(samples)
|
||||
|
@ -412,9 +407,7 @@ func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) {
|
|||
|
||||
func (l *LevelDBPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, err error) {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
|
||||
recordOutcome(time.Since(begin), err, hasIndexMetric)
|
||||
}(time.Now())
|
||||
|
||||
return l.MetricMembershipIndex.Has(m)
|
||||
|
@ -426,9 +419,7 @@ func (l *LevelDBPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, e
|
|||
// Persistence interface.
|
||||
func (l *LevelDBPersistence) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) (fps clientmodel.Fingerprints, err error) {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelMatchers, result: success}, map[string]string{operation: getFingerprintsForLabelMatchers, result: failure})
|
||||
recordOutcome(time.Since(begin), err, getFingerprintsForLabelMatchers)
|
||||
}(time.Now())
|
||||
|
||||
sets := []utility.Set{}
|
||||
|
@ -497,9 +488,7 @@ func (l *LevelDBPersistence) GetFingerprintsForLabelMatchers(labelMatchers metri
|
|||
func (l *LevelDBPersistence) GetLabelValuesForLabelName(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
|
||||
var err error
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: getLabelValuesForLabelName, result: success}, map[string]string{operation: getLabelValuesForLabelName, result: failure})
|
||||
recordOutcome(time.Since(begin), err, getLabelValuesForLabelName)
|
||||
}(time.Now())
|
||||
|
||||
values, _, err := l.LabelNameToLabelValues.Lookup(labelName)
|
||||
|
@ -512,9 +501,7 @@ func (l *LevelDBPersistence) GetLabelValuesForLabelName(labelName clientmodel.La
|
|||
// interface.
|
||||
func (l *LevelDBPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
|
||||
recordOutcome(time.Since(begin), err, getMetricForFingerprint)
|
||||
}(time.Now())
|
||||
|
||||
// TODO(matt): Update signature to work with ok.
|
||||
|
@ -697,3 +684,13 @@ func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.Fi
|
|||
}
|
||||
return storage.Skip
|
||||
}
|
||||
|
||||
func recordOutcome(duration time.Duration, err error, op string) {
|
||||
labels := prometheus.Labels{operation: op}
|
||||
if err == nil {
|
||||
labels[result] = success
|
||||
} else {
|
||||
labels[result] = failure
|
||||
}
|
||||
storageLatency.With(labels).Observe(float64(duration / time.Microsecond))
|
||||
}
|
||||
|
|
|
@ -21,9 +21,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/stats"
|
||||
"github.com/prometheus/prometheus/storage/metric"
|
||||
|
@ -31,6 +31,52 @@ import (
|
|||
"github.com/prometheus/prometheus/utility"
|
||||
)
|
||||
|
||||
// Constants for instrumentation.
|
||||
const (
|
||||
operation = "operation"
|
||||
success = "success"
|
||||
failure = "failure"
|
||||
result = "result"
|
||||
|
||||
appendSample = "append_sample"
|
||||
appendSamples = "append_samples"
|
||||
flushMemory = "flush_memory"
|
||||
getLabelValuesForLabelName = "get_label_values_for_label_name"
|
||||
getFingerprintsForLabelMatchers = "get_fingerprints_for_label_matchers"
|
||||
getMetricForFingerprint = "get_metric_for_fingerprint"
|
||||
hasIndexMetric = "has_index_metric"
|
||||
refreshHighWatermarks = "refresh_high_watermarks"
|
||||
renderView = "render_view"
|
||||
|
||||
queue = "queue"
|
||||
appendToDisk = "append_to_disk"
|
||||
viewGeneration = "view_generation"
|
||||
|
||||
facet = "facet"
|
||||
occupancy = "occupancy"
|
||||
capacity = "capacity"
|
||||
)
|
||||
|
||||
var (
|
||||
storageLatency = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_metric_disk_latency_microseconds",
|
||||
Help: "Latency for metric disk operations in microseconds.",
|
||||
Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
|
||||
},
|
||||
[]string{operation, result},
|
||||
)
|
||||
storedSamplesCount = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_stored_samples_total",
|
||||
Help: "The number of samples that have been stored.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(storageLatency)
|
||||
prometheus.MustRegister(storedSamplesCount)
|
||||
}
|
||||
|
||||
type chunk metric.Values
|
||||
|
||||
// TruncateBefore returns a subslice of the original such that extraneous
|
||||
|
@ -98,6 +144,8 @@ type TieredStorage struct {
|
|||
|
||||
dtoSampleKeys *dtoSampleKeyList
|
||||
sampleKeys *sampleKeyList
|
||||
|
||||
queueSizes *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
// viewJob encapsulates a request to extract sample values from the datastore.
|
||||
|
@ -159,6 +207,14 @@ func NewTieredStorage(
|
|||
|
||||
dtoSampleKeys: newDtoSampleKeyList(10),
|
||||
sampleKeys: newSampleKeyList(10),
|
||||
|
||||
queueSizes: prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "prometheus_storage_queue_sizes_total",
|
||||
Help: "The various sizes and capacities of the storage queues.",
|
||||
},
|
||||
[]string{queue, facet},
|
||||
),
|
||||
}
|
||||
|
||||
for i := 0; i < tieredMemorySemaphores; i++ {
|
||||
|
@ -177,7 +233,7 @@ func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) {
|
|||
}
|
||||
|
||||
t.memoryArena.AppendSamples(samples)
|
||||
storedSamplesCount.IncrementBy(prometheus.NilLabels, float64(len(samples)))
|
||||
storedSamplesCount.Add(float64(len(samples)))
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -257,14 +313,6 @@ func (t *TieredStorage) Serve(started chan<- bool) {
|
|||
|
||||
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
|
||||
defer flushMemoryTicker.Stop()
|
||||
queueReportTicker := time.NewTicker(time.Second)
|
||||
defer queueReportTicker.Stop()
|
||||
|
||||
go func() {
|
||||
for _ = range queueReportTicker.C {
|
||||
t.reportQueues()
|
||||
}
|
||||
}()
|
||||
|
||||
started <- true
|
||||
for {
|
||||
|
@ -292,14 +340,6 @@ func (t *TieredStorage) Serve(started chan<- bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *TieredStorage) reportQueues() {
|
||||
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
|
||||
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
|
||||
|
||||
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.ViewQueue)))
|
||||
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue)))
|
||||
}
|
||||
|
||||
// Flush flushes all samples to disk.
|
||||
func (t *TieredStorage) Flush() {
|
||||
t.flushSema <- true
|
||||
|
@ -399,15 +439,19 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
begin := time.Now()
|
||||
defer func() {
|
||||
t.memorySemaphore <- true
|
||||
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(
|
||||
duration,
|
||||
err,
|
||||
map[string]string{operation: renderView, result: success},
|
||||
map[string]string{operation: renderView, result: failure},
|
||||
)
|
||||
if err == nil {
|
||||
storageLatency.With(
|
||||
prometheus.Labels{operation: renderView, result: success},
|
||||
).Observe(
|
||||
float64(time.Since(begin) / time.Microsecond),
|
||||
)
|
||||
} else {
|
||||
storageLatency.With(
|
||||
prometheus.Labels{operation: renderView, result: failure},
|
||||
).Observe(
|
||||
float64(time.Since(begin) / time.Microsecond),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
view := newView()
|
||||
|
@ -740,3 +784,26 @@ func (t *TieredStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (cli
|
|||
}
|
||||
return m, err
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (t *TieredStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||
t.queueSizes.Describe(ch)
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (t *TieredStorage) Collect(ch chan<- prometheus.Metric) {
|
||||
t.queueSizes.With(prometheus.Labels{
|
||||
queue: appendToDisk, facet: occupancy,
|
||||
}).Set(float64(len(t.appendToDiskQueue)))
|
||||
t.queueSizes.With(prometheus.Labels{
|
||||
queue: appendToDisk, facet: capacity,
|
||||
}).Set(float64(cap(t.appendToDiskQueue)))
|
||||
t.queueSizes.With(prometheus.Labels{
|
||||
queue: viewGeneration, facet: occupancy,
|
||||
}).Set(float64(len(t.ViewQueue)))
|
||||
t.queueSizes.With(prometheus.Labels{
|
||||
queue: viewGeneration, facet: capacity,
|
||||
}).Set(float64(cap(t.ViewQueue)))
|
||||
|
||||
t.queueSizes.Collect(ch)
|
||||
}
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
result = "result"
|
||||
success = "success"
|
||||
failure = "failure"
|
||||
dropped = "dropped"
|
||||
|
||||
facet = "facet"
|
||||
occupancy = "occupancy"
|
||||
capacity = "capacity"
|
||||
)
|
||||
|
||||
var (
|
||||
samplesCount = prometheus.NewCounter()
|
||||
sendLatency = prometheus.NewDefaultHistogram()
|
||||
queueSize = prometheus.NewGauge()
|
||||
)
|
||||
|
||||
func recordOutcome(duration time.Duration, sampleCount int, err error) {
|
||||
labels := map[string]string{result: success}
|
||||
if err != nil {
|
||||
labels[result] = failure
|
||||
}
|
||||
|
||||
samplesCount.IncrementBy(labels, float64(sampleCount))
|
||||
ms := float64(duration / time.Millisecond)
|
||||
sendLatency.Add(labels, ms)
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.Register("prometheus_remote_tsdb_sent_samples_total", "Total number of samples processed to be sent to remote TSDB.", prometheus.NilLabels, samplesCount)
|
||||
|
||||
prometheus.Register("prometheus_remote_tsdb_latency_ms", "Latency quantiles for sending samples to the remote TSDB in milliseconds.", prometheus.NilLabels, sendLatency)
|
||||
prometheus.Register("prometheus_remote_tsdb_queue_size_total", "The size and capacity of the queue of samples to be sent to the remote TSDB.", prometheus.NilLabels, queueSize)
|
||||
}
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -31,6 +32,18 @@ const (
|
|||
batchSendDeadline = 5 * time.Second
|
||||
)
|
||||
|
||||
// String constants for instrumentation.
|
||||
const (
|
||||
result = "result"
|
||||
success = "success"
|
||||
failure = "failure"
|
||||
dropped = "dropped"
|
||||
|
||||
facet = "facet"
|
||||
occupancy = "occupancy"
|
||||
capacity = "capacity"
|
||||
)
|
||||
|
||||
// TSDBClient defines an interface for sending a batch of samples to an
|
||||
// external timeseries database (TSDB).
|
||||
type TSDBClient interface {
|
||||
|
@ -45,6 +58,10 @@ type TSDBQueueManager struct {
|
|||
pendingSamples clientmodel.Samples
|
||||
sendSemaphore chan bool
|
||||
drained chan bool
|
||||
|
||||
samplesCount *prometheus.CounterVec
|
||||
sendLatency *prometheus.SummaryVec
|
||||
queueSize *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
// NewTSDBQueueManager builds a new TSDBQueueManager.
|
||||
|
@ -54,6 +71,28 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
|
|||
queue: make(chan clientmodel.Samples, queueCapacity),
|
||||
sendSemaphore: make(chan bool, maxConcurrentSends),
|
||||
drained: make(chan bool),
|
||||
|
||||
samplesCount: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_remote_tsdb_sent_samples_total",
|
||||
Help: "Total number of samples processed to be sent to remote TSDB.",
|
||||
},
|
||||
[]string{result},
|
||||
),
|
||||
sendLatency: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "prometheus_remote_tsdb_latency_ms",
|
||||
Help: "Latency quantiles for sending samples to the remote TSDB in milliseconds.",
|
||||
},
|
||||
[]string{result},
|
||||
),
|
||||
queueSize: prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "prometheus_remote_tsdb_queue_size_total",
|
||||
Help: "The size and capacity of the queue of samples to be sent to the remote TSDB.",
|
||||
},
|
||||
[]string{facet},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,11 +102,38 @@ func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
|
|||
select {
|
||||
case t.queue <- s:
|
||||
default:
|
||||
samplesCount.IncrementBy(map[string]string{result: dropped}, float64(len(s)))
|
||||
t.samplesCount.WithLabelValues(dropped).Add(float64(len(s)))
|
||||
glog.Warningf("TSDB queue full, discarding %d samples", len(s))
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops sending samples to the TSDB and waits for pending sends to
|
||||
// complete.
|
||||
func (t *TSDBQueueManager) Close() {
|
||||
glog.Infof("TSDB queue manager shutting down...")
|
||||
close(t.queue)
|
||||
<-t.drained
|
||||
for i := 0; i < maxConcurrentSends; i++ {
|
||||
t.sendSemaphore <- true
|
||||
}
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (t *TSDBQueueManager) Describe(ch chan<- *prometheus.Desc) {
|
||||
t.samplesCount.Describe(ch)
|
||||
t.sendLatency.Describe(ch)
|
||||
t.queueSize.Describe(ch)
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (t *TSDBQueueManager) Collect(ch chan<- prometheus.Metric) {
|
||||
t.samplesCount.Collect(ch)
|
||||
t.sendLatency.Collect(ch)
|
||||
t.queueSize.WithLabelValues(occupancy).Set(float64(len(t.queue)))
|
||||
t.queueSize.WithLabelValues(capacity).Set(float64(cap(t.queue)))
|
||||
t.queueSize.Collect(ch)
|
||||
}
|
||||
|
||||
func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
|
||||
t.sendSemaphore <- true
|
||||
defer func() {
|
||||
|
@ -78,17 +144,15 @@ func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
|
|||
// sent correctly the first time, it's simply dropped on the floor.
|
||||
begin := time.Now()
|
||||
err := t.tsdb.Store(s)
|
||||
recordOutcome(time.Since(begin), len(s), err)
|
||||
duration := time.Since(begin) / time.Millisecond
|
||||
|
||||
labelValue := success
|
||||
if err != nil {
|
||||
glog.Warningf("error sending %d samples to TSDB: %s", len(s), err)
|
||||
labelValue = failure
|
||||
}
|
||||
}
|
||||
|
||||
// reportQueues reports notification queue occupancy and capacity.
|
||||
func (t *TSDBQueueManager) reportQueues() {
|
||||
queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue)))
|
||||
queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue)))
|
||||
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s)))
|
||||
t.sendLatency.WithLabelValues(labelValue).Observe(float64(duration))
|
||||
}
|
||||
|
||||
// Run continuously sends samples to the TSDB.
|
||||
|
@ -97,14 +161,6 @@ func (t *TSDBQueueManager) Run() {
|
|||
close(t.drained)
|
||||
}()
|
||||
|
||||
queueReportTicker := time.NewTicker(time.Second)
|
||||
go func() {
|
||||
for _ = range queueReportTicker.C {
|
||||
t.reportQueues()
|
||||
}
|
||||
}()
|
||||
defer queueReportTicker.Stop()
|
||||
|
||||
// Send batches of at most maxSamplesPerSend samples to the TSDB. If we
|
||||
// have fewer samples than that, flush them out after a deadline anyways.
|
||||
for {
|
||||
|
@ -136,14 +192,3 @@ func (t *TSDBQueueManager) flush() {
|
|||
}
|
||||
t.pendingSamples = t.pendingSamples[:0]
|
||||
}
|
||||
|
||||
// Close stops sending samples to the TSDB and waits for pending sends to
|
||||
// complete.
|
||||
func (t *TSDBQueueManager) Close() {
|
||||
glog.Infof("TSDB queue manager shutting down...")
|
||||
close(t.queue)
|
||||
<-t.drained
|
||||
for i := 0; i < maxConcurrentSends; i++ {
|
||||
t.sendSemaphore <- true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,7 @@ package api
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/exp"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
|
@ -38,8 +37,16 @@ func (msrv *MetricsService) RegisterHandler() {
|
|||
Handler: http.HandlerFunc(h),
|
||||
}
|
||||
}
|
||||
exp.Handle("/api/query", handler(msrv.Query))
|
||||
exp.Handle("/api/query_range", handler(msrv.QueryRange))
|
||||
exp.Handle("/api/metrics", handler(msrv.Metrics))
|
||||
exp.Handle("/api/targets", handler(msrv.SetTargets))
|
||||
http.Handle("/api/query", prometheus.InstrumentHandler(
|
||||
"/api/query", handler(msrv.Query),
|
||||
))
|
||||
http.Handle("/api/query_range", prometheus.InstrumentHandler(
|
||||
"/api/query_range", handler(msrv.QueryRange),
|
||||
))
|
||||
http.Handle("/api/metrics", prometheus.InstrumentHandler(
|
||||
"/api/metrics", handler(msrv.Metrics),
|
||||
))
|
||||
http.Handle("/api/targets", prometheus.InstrumentHandler(
|
||||
"/api/targets", handler(msrv.SetTargets),
|
||||
))
|
||||
}
|
||||
|
|
54
web/web.go
54
web/web.go
|
@ -19,7 +19,7 @@ import (
|
|||
"html/template"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
|
@ -27,7 +27,6 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/exp"
|
||||
|
||||
"github.com/prometheus/prometheus/web/api"
|
||||
"github.com/prometheus/prometheus/web/blob"
|
||||
|
@ -52,43 +51,54 @@ type WebService struct {
|
|||
}
|
||||
|
||||
func (w WebService) ServeForever() error {
|
||||
exp.Handle("/favicon.ico", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Handle("/favicon.ico", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "", 404)
|
||||
}))
|
||||
|
||||
// TODO(julius): This will need to be rewritten once the exp package provides
|
||||
// the coarse mux behaviors via a wrapper function.
|
||||
exp.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
|
||||
exp.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
|
||||
exp.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
|
||||
exp.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
|
||||
|
||||
exp.Handle("/", w.StatusHandler)
|
||||
exp.Handle("/databases", w.DatabasesHandler)
|
||||
exp.Handle("/alerts", w.AlertsHandler)
|
||||
exp.Handle("/consoles/", http.StripPrefix("/consoles/", w.ConsolesHandler))
|
||||
exp.HandleFunc("/graph", graphHandler)
|
||||
exp.HandleFunc("/heap", dumpHeap)
|
||||
http.Handle("/", prometheus.InstrumentHandler(
|
||||
"/", w.StatusHandler,
|
||||
))
|
||||
http.Handle("/databases", prometheus.InstrumentHandler(
|
||||
"/databases", w.DatabasesHandler,
|
||||
))
|
||||
http.Handle("/alerts", prometheus.InstrumentHandler(
|
||||
"/alerts", w.AlertsHandler,
|
||||
))
|
||||
http.Handle("/consoles/", prometheus.InstrumentHandler(
|
||||
"/consoles/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static"))),
|
||||
))
|
||||
http.Handle("/graph", prometheus.InstrumentHandler(
|
||||
"/graph", http.HandlerFunc(graphHandler),
|
||||
))
|
||||
http.Handle("/heap", prometheus.InstrumentHandler(
|
||||
"/heap", http.HandlerFunc(dumpHeap),
|
||||
))
|
||||
|
||||
w.MetricsHandler.RegisterHandler()
|
||||
exp.Handle("/metrics", prometheus.DefaultHandler)
|
||||
http.Handle("/metrics", prometheus.Handler())
|
||||
if *useLocalAssets {
|
||||
exp.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static"))))
|
||||
http.Handle("/static/", prometheus.InstrumentHandler(
|
||||
"/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static"))),
|
||||
))
|
||||
} else {
|
||||
exp.Handle("/static/", http.StripPrefix("/static/", new(blob.Handler)))
|
||||
http.Handle("/static/", prometheus.InstrumentHandler(
|
||||
"/static/", http.StripPrefix("/static/", new(blob.Handler)),
|
||||
))
|
||||
}
|
||||
|
||||
if *userAssetsPath != "" {
|
||||
exp.Handle("/user/", http.StripPrefix("/user/", http.FileServer(http.Dir(*userAssetsPath))))
|
||||
http.Handle("/user/", prometheus.InstrumentHandler(
|
||||
"/user/", http.StripPrefix("/user/", http.FileServer(http.Dir(*userAssetsPath))),
|
||||
))
|
||||
}
|
||||
|
||||
if *enableQuit {
|
||||
exp.HandleFunc("/-/quit", w.quitHandler)
|
||||
http.Handle("/-/quit", http.HandlerFunc(w.quitHandler))
|
||||
}
|
||||
|
||||
glog.Info("listening on ", *listenAddress)
|
||||
|
||||
return http.ListenAndServe(*listenAddress, exp.DefaultCoarseMux)
|
||||
return http.ListenAndServe(*listenAddress, nil)
|
||||
}
|
||||
|
||||
func (s WebService) quitHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
Loading…
Reference in a new issue