prometheus/retrieval/target.go
beorn7 33a50e69f7 Fix a deadlock
Double acquisition of the RLock usually doesn't blow up, but if the
write lock is called for between the two RLock's, we are deadlocked.

This deadlock does not exist in release-0.17, BTW.
2016-02-29 16:34:29 +01:00

679 lines
17 KiB
Go

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/util/httputil"
)
const (
scrapeHealthMetricName = "up"
scrapeDurationMetricName = "scrape_duration_seconds"
// Capacity of the channel to buffer samples during ingestion.
ingestedSamplesCap = 256
// Constants for instrumentation.
namespace = "prometheus"
interval = "interval"
)
var (
errSkippedScrape = errors.New("scrape skipped due to throttled ingestion")
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "target_interval_length_seconds",
Help: "Actual intervals between scrapes.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
},
[]string{interval},
)
targetSkippedScrapes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "target_skipped_scrapes_total",
Help: "Total number of scrapes that were skipped because the metric storage was throttled.",
},
[]string{interval},
)
)
func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetSkippedScrapes)
}
// TargetHealth describes the health state of a target.
type TargetHealth int
func (t TargetHealth) String() string {
switch t {
case HealthUnknown:
return "unknown"
case HealthGood:
return "up"
case HealthBad:
return "down"
}
panic("unknown state")
}
func (t TargetHealth) value() model.SampleValue {
if t == HealthGood {
return 1
}
return 0
}
const (
// HealthUnknown is the state of a Target before it is first scraped.
HealthUnknown TargetHealth = iota
// HealthGood is the state of a Target that has been successfully scraped.
HealthGood
// HealthBad is the state of a Target that was scraped unsuccessfully.
HealthBad
)
// TargetStatus contains information about the current status of a scrape target.
type TargetStatus struct {
lastError error
lastScrape time.Time
health TargetHealth
mu sync.RWMutex
}
// LastError returns the error encountered during the last scrape.
func (ts *TargetStatus) LastError() error {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.lastError
}
// LastScrape returns the time of the last scrape.
func (ts *TargetStatus) LastScrape() time.Time {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.lastScrape
}
// Health returns the last known health state of the target.
func (ts *TargetStatus) Health() TargetHealth {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.health
}
func (ts *TargetStatus) setLastScrape(t time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.lastScrape = t
}
func (ts *TargetStatus) setLastError(err error) {
ts.mu.Lock()
defer ts.mu.Unlock()
if err == nil {
ts.health = HealthGood
} else {
ts.health = HealthBad
}
ts.lastError = err
}
// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// The status object for the target. It is only set once on initialization.
status *TargetStatus
// Closing scraperStopping signals that scraping should stop.
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Mutex protects the members below.
sync.RWMutex
scrapeConfig *config.ScrapeConfig
// Labels before any processing.
metaLabels model.LabelSet
// Any labels that are added to this target and its metrics.
labels model.LabelSet
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
}
// NewTarget creates a reasonably configured target for querying.
func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) {
t := &Target{
status: &TargetStatus{},
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
}
err := t.Update(cfg, labels, metaLabels)
return t, err
}
// Status returns the status of the target.
func (t *Target) Status() *TargetStatus {
return t.status
}
// Update overwrites settings in the target that are derived from the job config
// it belongs to.
func (t *Target) Update(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) error {
t.Lock()
t.scrapeConfig = cfg
t.labels = labels
t.metaLabels = metaLabels
t.Unlock()
httpClient, err := t.client()
if err != nil {
return fmt.Errorf("cannot create HTTP client: %s", err)
}
t.Lock()
t.httpClient = httpClient
t.Unlock()
return nil
}
func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
tlsOpts := httputil.TLSOptions{
InsecureSkipVerify: cfg.TLSConfig.InsecureSkipVerify,
CAFile: cfg.TLSConfig.CAFile,
}
if len(cfg.TLSConfig.CertFile) > 0 && len(cfg.TLSConfig.KeyFile) > 0 {
tlsOpts.CertFile = cfg.TLSConfig.CertFile
tlsOpts.KeyFile = cfg.TLSConfig.KeyFile
}
tlsConfig, err := httputil.NewTLSConfig(tlsOpts)
if err != nil {
return nil, err
}
// The only timeout we care about is the configured scrape timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(cfg.ProxyURL.URL),
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
}
// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
bearerToken := cfg.BearerToken
if len(bearerToken) == 0 && len(cfg.BearerTokenFile) > 0 {
b, err := ioutil.ReadFile(cfg.BearerTokenFile)
if err != nil {
return nil, fmt.Errorf("unable to read bearer token file %s: %s", cfg.BearerTokenFile, err)
}
bearerToken = string(b)
}
if len(bearerToken) > 0 {
rt = httputil.NewBearerAuthRoundTripper(bearerToken, rt)
}
if cfg.BasicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, rt)
}
// Return a new client with the configured round tripper.
return httputil.NewClient(rt), nil
}
func (t *Target) String() string {
return t.host()
}
// fingerprint returns an identifying hash for the target.
func (t *Target) fingerprint() model.Fingerprint {
t.RLock()
defer t.RUnlock()
return t.labels.Fingerprint()
}
// offset returns the time until the next scrape cycle for the target.
func (t *Target) offset(interval time.Duration) time.Duration {
now := time.Now().UnixNano()
var (
base = now % int64(interval)
offset = uint64(t.fingerprint()) % uint64(interval)
next = base + int64(offset)
)
if next > int64(interval) {
next -= int64(interval)
}
return time.Duration(next)
}
func (t *Target) client() (*http.Client, error) {
t.RLock()
defer t.RUnlock()
return newHTTPClient(t.scrapeConfig)
}
func (t *Target) interval() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeInterval)
}
func (t *Target) timeout() time.Duration {
t.RLock()
defer t.RUnlock()
return time.Duration(t.scrapeConfig.ScrapeTimeout)
}
func (t *Target) scheme() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.SchemeLabel])
}
func (t *Target) host() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.AddressLabel])
}
func (t *Target) path() string {
t.RLock()
defer t.RUnlock()
return string(t.labels[model.MetricsPathLabel])
}
// wrapAppender wraps a SampleAppender for samples ingested from the target.
// RLock must be acquired by the caller.
func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender {
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
SampleAppender: app,
relabelings: mrc,
}
}
if t.scrapeConfig.HonorLabels {
app = honorLabelsAppender{
SampleAppender: app,
labels: t.unlockedLabels(),
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: t.unlockedLabels(),
}
}
return app
}
// wrapReportingAppender wraps an appender for target status report samples.
// It ignores any relabeling rules set for the target.
// RLock must not be acquired by the caller.
func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender {
return ruleLabelsAppender{
SampleAppender: app,
labels: t.Labels(),
}
}
// URL returns a copy of the target's URL.
func (t *Target) URL() *url.URL {
t.RLock()
defer t.RUnlock()
params := url.Values{}
for k, v := range t.scrapeConfig.Params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
for k, v := range t.labels {
if !strings.HasPrefix(string(k), model.ParamLabelPrefix) {
continue
}
ks := string(k[len(model.ParamLabelPrefix):])
if len(params[ks]) > 0 {
params[ks][0] = string(v)
} else {
params[ks] = []string{string(v)}
}
}
return &url.URL{
Scheme: string(t.labels[model.SchemeLabel]),
Host: string(t.labels[model.AddressLabel]),
Path: string(t.labels[model.MetricsPathLabel]),
RawQuery: params.Encode(),
}
}
// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.host()
}
// RunScraper implements Target.
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped)
lastScrapeInterval := t.interval()
log.Debugf("Starting scraper for target %v...", t)
select {
case <-time.After(t.offset(lastScrapeInterval)):
// Continue after scraping offset.
case <-t.scraperStopping:
return
}
ticker := time.NewTicker(lastScrapeInterval)
defer ticker.Stop()
t.scrape(sampleAppender)
// Explanation of the contraption below:
//
// In case t.scraperStopping has something to receive, we want to read
// from that channel rather than starting a new scrape (which might take very
// long). That's why the outer select has no ticker.C. Should t.scraperStopping
// not have anything to receive, we go into the inner select, where ticker.C
// is in the mix.
for {
select {
case <-t.scraperStopping:
return
default:
select {
case <-t.scraperStopping:
return
case <-ticker.C:
took := time.Since(t.status.LastScrape())
intervalStr := lastScrapeInterval.String()
// On changed scrape interval the new interval becomes effective
// after the next scrape.
if iv := t.interval(); iv != lastScrapeInterval {
ticker.Stop()
ticker = time.NewTicker(iv)
lastScrapeInterval = iv
}
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
)
if sampleAppender.NeedsThrottling() {
targetSkippedScrapes.WithLabelValues(intervalStr).Inc()
t.status.setLastError(errSkippedScrape)
continue
}
t.scrape(sampleAppender)
}
}
}
}
// StopScraper implements Target.
func (t *Target) StopScraper() {
log.Debugf("Stopping scraper for target %v...", t)
close(t.scraperStopping)
<-t.scraperStopped
log.Debugf("Scraper for target %v stopped.", t)
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *Target) scrape(appender storage.SampleAppender) error {
var (
err error
start = time.Now()
)
defer func(appender storage.SampleAppender) {
t.report(appender, start, time.Since(start), err)
}(appender)
t.RLock()
appender = t.wrapAppender(appender)
client := t.httpClient
t.RUnlock()
req, err := http.NewRequest("GET", t.URL().String(), nil)
if err != nil {
return err
}
req.Header.Add("Accept", acceptHeader)
ctx, _ := context.WithTimeout(context.Background(), t.timeout())
resp, err := ctxhttp.Do(ctx, client, req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned HTTP status %s", resp.Status)
}
dec := expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header))
sdec := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(start.UnixNano()),
},
}
var (
samples model.Vector
numOutOfOrder int
logger = log.With("target", t.InstanceIdentifier())
)
for {
if err = sdec.Decode(&samples); err != nil {
break
}
for _, s := range samples {
err := appender.Append(s)
if err != nil {
if err == local.ErrOutOfOrderSample {
numOutOfOrder++
} else {
logger.With("sample", s).Warnf("Error inserting sample: %s", err)
}
}
}
}
if numOutOfOrder > 0 {
logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
}
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
}
return err
}
func (t *Target) report(app storage.SampleAppender, start time.Time, duration time.Duration, err error) {
t.status.setLastScrape(start)
t.status.setLastError(err)
ts := model.TimeFromUnixNano(start.UnixNano())
var health model.SampleValue
if err == nil {
health = 1
}
healthSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeHealthMetricName,
},
Timestamp: ts,
Value: health,
}
durationSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeDurationMetricName,
},
Timestamp: ts,
Value: model.SampleValue(float64(duration) / float64(time.Second)),
}
app = t.wrapReportingAppender(app)
app.Append(healthSample)
app.Append(durationSample)
}
// Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct {
storage.SampleAppender
labels model.LabelSet
}
func (app ruleLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels {
if v, ok := s.Metric[ln]; ok && v != "" {
s.Metric[model.ExportedLabelPrefix+ln] = v
}
s.Metric[ln] = lv
}
return app.SampleAppender.Append(s)
}
type honorLabelsAppender struct {
storage.SampleAppender
labels model.LabelSet
}
// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Append(s *model.Sample) error {
for ln, lv := range app.labels {
if _, ok := s.Metric[ln]; !ok {
s.Metric[ln] = lv
}
}
return app.SampleAppender.Append(s)
}
// Applies a set of relabel configurations to the sample's metric
// before actually appending it.
type relabelAppender struct {
storage.SampleAppender
relabelings []*config.RelabelConfig
}
func (app relabelAppender) Append(s *model.Sample) error {
labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...)
if err != nil {
return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err)
}
// Check if the timeseries was dropped.
if labels == nil {
return nil
}
s.Metric = model.Metric(labels)
return app.SampleAppender.Append(s)
}
// Labels returns a copy of the set of all public labels of the target.
func (t *Target) Labels() model.LabelSet {
t.RLock()
defer t.RUnlock()
return t.unlockedLabels()
}
// unlockedLabels does the same as Labels but does not lock the mutex (useful
// for internal usage when the mutex is already locked).
func (t *Target) unlockedLabels() model.LabelSet {
lset := make(model.LabelSet, len(t.labels))
for ln, lv := range t.labels {
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
lset[ln] = lv
}
}
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
}
return lset
}
// MetaLabels returns a copy of the target's labels before any processing.
func (t *Target) MetaLabels() model.LabelSet {
t.RLock()
defer t.RUnlock()
return t.metaLabels.Clone()
}