Merge pull request #43 from matttproud/refactor/telemetry/refresh

Upgrade Prometheus to new API.
This commit is contained in:
Matt T. Proud 2013-01-23 08:44:16 -08:00
commit 33b34d71a4
5 changed files with 242 additions and 144 deletions

View file

@ -19,23 +19,29 @@ import (
"github.com/matttproud/golang_instrumentation/metrics"
)
const (
address = "instance"
alive = "alive"
failure = "failure"
outcome = "outcome"
state = "state"
success = "success"
unreachable = "unreachable"
)
var (
networkLatencyHistogram = &metrics.HistogramSpecification{
Starts: metrics.LogarithmicSizedBucketsFor(0, 1000),
BucketMaker: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 100),
BucketBuilder: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 100),
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
}
targetsHealthy = &metrics.CounterMetric{}
targetsUnhealthy = &metrics.CounterMetric{}
targetOperationLatencies = metrics.NewHistogram(networkLatencyHistogram)
scrapeLatencyHealthy = metrics.CreateHistogram(networkLatencyHistogram)
scrapeLatencyUnhealthy = metrics.CreateHistogram(networkLatencyHistogram)
targetOperations = metrics.NewCounter()
)
func init() {
registry.Register("targets_healthy_total", "Total number of healthy scrape targets", map[string]string{}, targetsHealthy)
registry.Register("targets_unhealthy_total", "Total number of unhealthy scrape targets", map[string]string{}, targetsUnhealthy)
registry.Register("targets_healthy_scrape_latency_ms", "Scrape latency for healthy targets in milliseconds", map[string]string{}, scrapeLatencyHealthy)
registry.Register("targets_unhealthy_scrape_latency_ms", "Scrape latency for unhealthy targets in milliseconds", map[string]string{}, scrapeLatencyUnhealthy)
registry.Register("prometheus_target_operations_total", "The total numbers of operations of the various targets that are being monitored.", registry.NilLabels, targetOperations)
registry.Register("prometheus_target_operation_latency_ms", "The latencies for various target operations.", registry.NilLabels, targetOperationLatencies)
}

View file

@ -235,11 +235,13 @@ func (t *target) Scrape(earliest time.Time, results chan Result) (err error) {
accumulator := func(d time.Duration) {
ms := float64(d) / float64(time.Millisecond)
if err == nil {
scrapeLatencyHealthy.Add(ms)
} else {
scrapeLatencyUnhealthy.Add(ms)
labels := map[string]string{address: t.Address(), outcome: success}
if err != nil {
labels[outcome] = failure
}
targetOperationLatencies.Add(labels, ms)
targetOperations.Increment(labels)
}
go metrics.InstrumentCall(request, accumulator)

View file

@ -17,20 +17,56 @@ import (
"github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/maths"
"github.com/matttproud/golang_instrumentation/metrics"
"time"
)
const (
operation = "operation"
success = "success"
failure = "failure"
result = "result"
appendFingerprints = "append_fingerprints"
appendLabelNameFingerprint = "append_label_name_fingerprint"
appendLabelPairFingerprint = "append_label_pair_fingerprint"
appendSample = "append_sample"
getBoundaryValues = "get_boundary_values"
getFingerprintsForLabelName = "get_fingerprints_for_label_name"
getFingerprintsForLabelSet = "get_fingerprints_for_labelset"
getLabelNameFingerprints = "get_label_name_fingerprints"
getMetricForFingerprint = "get_metric_for_fingerprint"
getRangeValues = "get_range_values"
getValueAtTime = "get_value_at_time"
hasIndexMetric = "has_index_metric"
hasLabelName = "has_label_name"
hasLabelPair = "has_label_pair"
indexMetric = "index_metric"
setLabelNameFingerprints = "set_label_name_fingerprints"
setLabelPairFingerprints = "set_label_pair_fingerprints"
)
var (
diskLatencyHistogram = &metrics.HistogramSpecification{
Starts: metrics.LogarithmicSizedBucketsFor(0, 5000),
BucketMaker: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 100),
BucketBuilder: metrics.AccumulatingBucketBuilder(metrics.EvictAndReplaceWith(10, maths.Average), 100),
ReportablePercentiles: []float64{0.01, 0.05, 0.5, 0.90, 0.99},
}
targetsHealthy = &metrics.CounterMetric{}
appendLatency = metrics.CreateHistogram(diskLatencyHistogram)
storageOperations = metrics.NewCounter()
storageLatency = metrics.NewHistogram(diskLatencyHistogram)
)
func init() {
registry.Register("sample_append_disk_latency_microseconds", "Latency for sample appends to disk in microseconds", map[string]string{}, appendLatency)
func recordOutcome(counter metrics.Counter, latency metrics.Histogram, duration time.Duration, err error, success, failure map[string]string) {
labels := success
if err != nil {
labels = failure
}
counter.Increment(labels)
latency.Add(labels, float64(duration/time.Microsecond))
}
func init() {
registry.Register("prometheus_metric_disk_operations_total", "Total number of metric-related disk operations.", registry.NilLabels, storageOperations)
registry.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", registry.NilLabels, storageLatency)
}

View file

@ -15,8 +15,6 @@ package leveldb
import (
"code.google.com/p/goprotobuf/proto"
registry "github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/metrics"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model"
@ -24,29 +22,48 @@ import (
"time"
)
var (
appendSuccessCount = &metrics.CounterMetric{}
appendFailureCount = &metrics.CounterMetric{}
)
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) (err error) {
begin := time.Now()
func init() {
registry.Register("sample_append_success_count_total", "Total successfully appended samples", map[string]string{}, appendSuccessCount)
registry.Register("sample_append_failure_count_total", "Total sample append failures", map[string]string{}, appendFailureCount)
}
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: setLabelPairFingerprints, result: success}, map[string]string{operation: setLabelPairFingerprints, result: failure})
}()
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *dto.LabelPair, fingerprints *dto.FingerprintCollection) error {
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
err = l.labelSetToFingerprints.Put(labelPairEncoded, fingerprintsEncoded)
return
}
func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *dto.LabelName, fingerprints *dto.FingerprintCollection) error {
func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *dto.LabelName, fingerprints *dto.FingerprintCollection) (err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: setLabelNameFingerprints, result: success}, map[string]string{operation: setLabelNameFingerprints, result: failure})
}()
labelNameEncoded := coding.NewProtocolBufferEncoder(labelName)
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints)
return l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
err = l.labelNameToFingerprints.Put(labelNameEncoded, fingerprintsEncoded)
return
}
func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendLabelPairFingerprint, result: success}, map[string]string{operation: appendLabelPairFingerprint, result: failure})
}()
has, err := l.HasLabelPair(labelPair)
if err != nil {
return
@ -64,10 +81,20 @@ func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *dto.Lab
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelPairFingerprints(labelPair, fingerprints)
err = l.setLabelPairFingerprints(labelPair, fingerprints)
return
}
func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.LabelPair, fingerprint *dto.Fingerprint) (err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendLabelNameFingerprint, result: success}, map[string]string{operation: appendLabelNameFingerprint, result: failure})
}()
labelName := &dto.LabelName{
Name: labelPair.Name,
}
@ -89,10 +116,20 @@ func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *dto.Lab
fingerprints.Member = append(fingerprints.Member, fingerprint)
return l.setLabelNameFingerprints(labelName, fingerprints)
err = l.setLabelNameFingerprints(labelName, fingerprints)
return
}
func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendFingerprints, result: success}, map[string]string{operation: appendFingerprints, result: failure})
}()
fingerprintDTO, err := model.MessageToFingerprintDTO(m)
if err != nil {
return
@ -140,64 +177,51 @@ func (l *LevelDBMetricPersistence) appendFingerprints(m *dto.Metric) (err error)
}
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) (err error) {
begin := time.Now()
defer func() {
var m *metrics.CounterMetric = appendSuccessCount
duration := time.Now().Sub(begin)
if err != nil {
m = appendFailureCount
}
m.Increment()
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure})
}()
operation := func() {
metricDTO := model.SampleToMetricDTO(sample)
metricDTO := model.SampleToMetricDTO(sample)
indexHas, err := l.hasIndexMetric(metricDTO)
indexHas, err := l.hasIndexMetric(metricDTO)
if err != nil {
return
}
if !indexHas {
err = l.indexMetric(metricDTO)
if err != nil {
return
}
if !indexHas {
err = l.indexMetric(metricDTO)
if err != nil {
return
}
err = l.appendFingerprints(metricDTO)
if err != nil {
return
}
}
fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO)
if err != nil {
return
}
sampleKeyDTO := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(sample.Timestamp),
}
sampleValueDTO := &dto.SampleValue{
Value: proto.Float32(float32(sample.Value)),
}
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
err = l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded)
err = l.appendFingerprints(metricDTO)
if err != nil {
return
}
}
// XXX: Problematic with panics.
accumulator := func(d time.Duration) {
ms := float64(d) / float64(time.Microsecond)
appendLatency.Add(ms)
fingerprintDTO, err := model.MessageToFingerprintDTO(metricDTO)
if err != nil {
return
}
metrics.InstrumentCall(operation, accumulator)
sampleKeyDTO := &dto.SampleKey{
Fingerprint: fingerprintDTO,
Timestamp: indexable.EncodeTime(sample.Timestamp),
}
sampleValueDTO := &dto.SampleValue{
Value: proto.Float32(float32(sample.Value)),
}
sampleKeyEncoded := coding.NewProtocolBufferEncoder(sampleKeyDTO)
sampleValueEncoded := coding.NewProtocolBufferEncoder(sampleValueDTO)
err = l.metricSamples.Put(sampleKeyEncoded, sampleValueEncoded)
if err != nil {
return
}
return
}

View file

@ -15,8 +15,6 @@ package leveldb
import (
"code.google.com/p/goprotobuf/proto"
registry "github.com/matttproud/golang_instrumentation"
"github.com/matttproud/golang_instrumentation/metrics"
"github.com/matttproud/prometheus/coding"
"github.com/matttproud/prometheus/coding/indexable"
"github.com/matttproud/prometheus/model"
@ -26,33 +24,6 @@ import (
"time"
)
var (
getLabelNameFingerprintsSuccessCount = &metrics.CounterMetric{}
getLabelNameFingerprintsFailureCount = &metrics.CounterMetric{}
getFingerprintsForLabelSetSuccessCount = &metrics.CounterMetric{}
getFingerprintsForLabelSetFailureCount = &metrics.CounterMetric{}
getFingerprintsForLabelNameSuccessCount = &metrics.CounterMetric{}
getFingerprintsForLabelNameFailureCount = &metrics.CounterMetric{}
getMetricForFingerprintSuccessCount = &metrics.CounterMetric{}
getMetricForFingerprintFailureCount = &metrics.CounterMetric{}
getBoundaryValuesSuccessCount = &metrics.CounterMetric{}
getBoundaryValuesFailureCount = &metrics.CounterMetric{}
)
func init() {
registry.Register("get_label_name_fingerprints_success_count_total", "Successfully fetched label name fingerprints", map[string]string{}, getLabelNameFingerprintsSuccessCount)
registry.Register("get_label_name_fingerprints_failure_count_total", "Failures while fetching label name fingerprints", map[string]string{}, getLabelNameFingerprintsFailureCount)
registry.Register("get_fingerprints_for_label_set_success_count_total", "Successfully fetched label set fingerprints", map[string]string{}, getFingerprintsForLabelSetSuccessCount)
registry.Register("get_fingerprints_for_label_set_failure_count_total", "Failures while fetching label set fingerprints", map[string]string{}, getFingerprintsForLabelSetFailureCount)
registry.Register("get_fingerprints_for_label_name_success_count_total", "Successfully fetched label name fingerprints", map[string]string{}, getFingerprintsForLabelNameSuccessCount)
registry.Register("get_fingerprints_for_label_name_failure_count_total", "Failures while fetching label name fingerprints", map[string]string{}, getFingerprintsForLabelNameFailureCount)
registry.Register("get_metric_for_fingerprint_success_count_total", "Successfully fetched metrics by fingerprint", map[string]string{}, getMetricForFingerprintSuccessCount)
registry.Register("get_metric_for_fingerprint_failure_count_total", "Failures while fetching metrics by fingerprint", map[string]string{}, getMetricForFingerprintFailureCount)
registry.Register("get_boundary_values_success_count_total", "Successfully fetched metric boundary values", map[string]string{}, getBoundaryValuesSuccessCount)
registry.Register("get_boundary_values_failure_count_total", "Failures while fetching boundary values", map[string]string{}, getBoundaryValuesFailureCount)
}
func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
k = &dto.SampleKey{}
err = proto.Unmarshal(i.Key(), k)
@ -105,27 +76,75 @@ func keyIsAtMostOld(t time.Time) sampleKeyPredicate {
}
}
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (bool, error) {
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Has(dtoKey)
value, err = l.metricMembershipIndex.Has(dtoKey)
return
}
func (l *LevelDBMetricPersistence) indexMetric(dto *dto.Metric) error {
func (l *LevelDBMetricPersistence) indexMetric(dto *dto.Metric) (err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: indexMetric, result: success}, map[string]string{operation: indexMetric, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.metricMembershipIndex.Put(dtoKey)
err = l.metricMembershipIndex.Put(dtoKey)
return
}
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (bool, error) {
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelSetToFingerprints.Has(dtoKey)
value, err = l.labelSetToFingerprints.Has(dtoKey)
return
}
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (bool, error) {
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(dto)
return l.labelNameToFingerprints.Has(dtoKey)
value, err = l.labelNameToFingerprints.Has(dtoKey)
return
}
func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair) (c *dto.FingerprintCollection, err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(p)
get, err := l.labelSetToFingerprints.Get(dtoKey)
if err != nil {
@ -140,13 +159,12 @@ func (l *LevelDBMetricPersistence) getFingerprintsForLabelSet(p *dto.LabelPair)
// XXX: Delete me and replace with GetFingerprintsForLabelName.
func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (c *dto.FingerprintCollection, err error) {
defer func() {
m := getLabelNameFingerprintsSuccessCount
if err != nil {
m = getLabelNameFingerprintsFailureCount
}
begin := time.Now()
m.Increment()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getLabelNameFingerprints, result: success}, map[string]string{operation: getLabelNameFingerprints, result: failure})
}()
dtoKey := coding.NewProtocolBufferEncoder(n)
@ -162,13 +180,12 @@ func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(n *dto.LabelName) (c
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.LabelSet) (fps []*model.Fingerprint, err error) {
defer func() {
m := getFingerprintsForLabelSetSuccessCount
if err != nil {
m = getFingerprintsForLabelSetFailureCount
}
begin := time.Now()
m.Increment()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure})
}()
sets := []utility.Set{}
@ -214,13 +231,12 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet *model.La
}
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.LabelName) (fps []*model.Fingerprint, err error) {
defer func() {
m := getFingerprintsForLabelNameSuccessCount
if err != nil {
m = getFingerprintsForLabelNameFailureCount
}
begin := time.Now()
m.Increment()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}()
fps = make([]*model.Fingerprint, 0, 0)
@ -246,13 +262,12 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName *model.
}
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m *model.Metric, err error) {
defer func() {
m := getMetricForFingerprintSuccessCount
if err != nil {
m = getMetricForFingerprintFailureCount
}
begin := time.Now()
m.Increment()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}()
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBufferEncoder(model.FingerprintToDTO(f)))
@ -275,13 +290,12 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint)
}
func (l *LevelDBMetricPersistence) GetBoundaryValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (open *model.Sample, end *model.Sample, err error) {
defer func() {
m := getBoundaryValuesSuccessCount
if err != nil {
m = getBoundaryValuesFailureCount
}
begin := time.Now()
m.Increment()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure})
}()
// XXX: Maybe we will want to emit incomplete sets?
@ -325,6 +339,14 @@ type iterator interface {
}
func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time, s *metric.StalenessPolicy) (sample *model.Sample, err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure})
}()
d := model.MetricToDTO(m)
f, err := model.MessageToFingerprintDTO(d)
@ -536,6 +558,14 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m *model.Metric, t *time.Time,
}
func (l *LevelDBMetricPersistence) GetRangeValues(m *model.Metric, i *model.Interval, s *metric.StalenessPolicy) (v *model.SampleSet, err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(storageOperations, storageLatency, duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
}()
d := model.MetricToDTO(m)
f, err := model.MessageToFingerprintDTO(d)