Avoid inter-component blocking if ingestion/scraping blocks.

Appending to the storage can block for a long time. Timing out
scrapes can also cause longer blocks. This commit avoids that those
blocks affect other compnents than the target itself.
Also the Target interface was removed.
This commit is contained in:
Fabian Reinartz 2015-05-18 13:14:41 +02:00
parent 1a2d57b45c
commit 385919a65a
7 changed files with 102 additions and 137 deletions

View file

@ -67,72 +67,35 @@ func init() {
prometheus.MustRegister(targetIntervalLength)
}
// TargetState describes the state of a Target.
type TargetState int
// TargetHealth describes the health state of a target.
type TargetHealth int
func (t TargetState) String() string {
func (t TargetHealth) String() string {
switch t {
case Unknown:
case HealthUnknown:
return "UNKNOWN"
case Healthy:
case HealthGood:
return "HEALTHY"
case Unhealthy:
case HealthBad:
return "UNHEALTHY"
}
panic("unknown state")
}
const (
// Unknown is the state of a Target before it is first scraped.
Unknown TargetState = iota
HealthUnknown TargetHealth = iota
// Healthy is the state of a Target that has been successfully scraped.
Healthy
HealthGood
// Unhealthy is the state of a Target that was scraped unsuccessfully.
Unhealthy
HealthBad
)
// A Target represents an endpoint that should be interrogated for metrics.
//
// The protocol described by this type will likely change in future iterations,
// as it offers no good support for aggregated targets and fan out. Thusly,
// it is likely that the current Target and target uses will be
// wrapped with some resolver type.
//
// For the future, the Target protocol will abstract away the exact means that
// metrics are retrieved and deserialized from the given instance to which it
// refers.
//
// Target implements extraction.Ingester.
type Target interface {
extraction.Ingester
// Status returns the current status of the target.
Status() *TargetStatus
// The URL to which the Target corresponds. Out of all of the available
// points in this interface, this one is the best candidate to change given
// the ways to express the endpoint.
URL() string
// Used to populate the `instance` label in metrics.
InstanceIdentifier() string
// Return the labels describing the targets. These are the base labels
// as well as internal labels.
fullLabels() clientmodel.LabelSet
// Return the target's base labels.
BaseLabels() clientmodel.LabelSet
// Start scraping the target in regular intervals.
RunScraper(storage.SampleAppender)
// Stop scraping, synchronous.
StopScraper()
// Update the target's state.
Update(*config.ScrapeConfig, clientmodel.LabelSet)
}
// TargetStatus contains information about the current status of a scrape target.
type TargetStatus struct {
lastError error
lastScrape time.Time
state TargetState
health TargetHealth
mu sync.RWMutex
}
@ -151,11 +114,11 @@ func (ts *TargetStatus) LastScrape() time.Time {
return ts.lastScrape
}
// State returns the last known health state of the target.
func (ts *TargetStatus) State() TargetState {
// Health returns the last known health state of the target.
func (ts *TargetStatus) Health() TargetHealth {
ts.mu.RLock()
defer ts.mu.RUnlock()
return ts.state
return ts.health
}
func (ts *TargetStatus) setLastScrape(t time.Time) {
@ -168,15 +131,20 @@ func (ts *TargetStatus) setLastError(err error) {
ts.mu.Lock()
defer ts.mu.Unlock()
if err == nil {
ts.state = Healthy
ts.health = HealthGood
} else {
ts.state = Unhealthy
ts.health = HealthBad
}
ts.lastError = err
}
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
type target struct {
// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// The status object for the target. It is only set once on initialization.
status *TargetStatus
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// Closing scraperStopping signals that scraping should stop.
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
@ -184,14 +152,9 @@ type target struct {
// Channel to buffer ingested samples.
ingestedSamples chan clientmodel.Samples
// The status object for the target. It is only set once on initialization.
status *TargetStatus
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// Mutex protects the members below.
sync.RWMutex
// url is the URL to be scraped. Its host is immutable.
url *url.URL
// Any base labels that are added to this target and its metrics.
baseLabels clientmodel.LabelSet
@ -202,8 +165,8 @@ type target struct {
}
// NewTarget creates a reasonably configured target for querying.
func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) Target {
t := &target{
func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) *Target {
t := &Target{
url: &url.URL{
Host: string(baseLabels[clientmodel.AddressLabel]),
},
@ -215,14 +178,14 @@ func NewTarget(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) Target
return t
}
// Status implements the Target interface.
func (t *target) Status() *TargetStatus {
// Status returns the status of the target.
func (t *Target) Status() *TargetStatus {
return t.status
}
// Update overwrites settings in the target that are derived from the job config
// it belongs to.
func (t *target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) {
func (t *Target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSet) {
t.Lock()
defer t.Unlock()
@ -248,12 +211,15 @@ func (t *target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSe
}
}
func (t *target) String() string {
func (t *Target) String() string {
return t.url.Host
}
// Ingest implements Target and extraction.Ingester.
func (t *target) Ingest(s clientmodel.Samples) error {
// Ingest implements an extraction.Ingester.
func (t *Target) Ingest(s clientmodel.Samples) error {
t.RLock()
deadline := t.deadline
t.RUnlock()
// Since the regular case is that ingestedSamples is ready to receive,
// first try without setting a timeout so that we don't need to allocate
// a timer most of the time.
@ -264,14 +230,17 @@ func (t *target) Ingest(s clientmodel.Samples) error {
select {
case t.ingestedSamples <- s:
return nil
case <-time.After(t.deadline / 10):
case <-time.After(deadline / 10):
return errIngestChannelFull
}
}
}
// Ensure that Target implements extraction.Ingester at compile time.
var _ extraction.Ingester = (*Target)(nil)
// RunScraper implements Target.
func (t *target) RunScraper(sampleAppender storage.SampleAppender) {
func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
defer close(t.scraperStopped)
t.RLock()
@ -316,7 +285,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) {
intervalStr := lastScrapeInterval.String()
t.Lock()
t.RLock()
// On changed scrape interval the new interval becomes effective
// after the next scrape.
if lastScrapeInterval != t.scrapeInterval {
@ -324,7 +293,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) {
ticker = time.NewTicker(t.scrapeInterval)
lastScrapeInterval = t.scrapeInterval
}
t.Unlock()
t.RUnlock()
targetIntervalLength.WithLabelValues(intervalStr).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
@ -336,7 +305,7 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender) {
}
// StopScraper implements Target.
func (t *target) StopScraper() {
func (t *Target) StopScraper() {
glog.V(1).Infof("Stopping scraper for target %v...", t)
close(t.scraperStopping)
@ -347,18 +316,16 @@ func (t *target) StopScraper() {
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
t.RLock()
func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) {
start := time.Now()
baseLabels := t.BaseLabels()
defer func() {
t.RUnlock()
t.status.setLastError(err)
t.recordScrapeHealth(sampleAppender, clientmodel.TimestampFromTime(start), time.Since(start))
recordScrapeHealth(sampleAppender, clientmodel.TimestampFromTime(start), baseLabels, t.status.Health(), time.Since(start))
}()
req, err := http.NewRequest("GET", t.url.String(), nil)
req, err := http.NewRequest("GET", t.URL(), nil)
if err != nil {
panic(err)
}
@ -390,7 +357,7 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
for samples := range t.ingestedSamples {
for _, s := range samples {
s.Metric.MergeFromLabelSet(t.baseLabels, clientmodel.ExporterLabelPrefix)
s.Metric.MergeFromLabelSet(baseLabels, clientmodel.ExporterLabelPrefix)
sampleAppender.Append(s)
}
}
@ -398,19 +365,19 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
}
// URL implements Target.
func (t *target) URL() string {
func (t *Target) URL() string {
t.RLock()
defer t.RUnlock()
return t.url.String()
}
// InstanceIdentifier implements Target.
func (t *target) InstanceIdentifier() string {
// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.url.Host
}
// fullLabels implements Target.
func (t *target) fullLabels() clientmodel.LabelSet {
// fullLabels returns the base labels plus internal labels defining the target.
func (t *Target) fullLabels() clientmodel.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(clientmodel.LabelSet, len(t.baseLabels)+2)
@ -422,8 +389,8 @@ func (t *target) fullLabels() clientmodel.LabelSet {
return lset
}
// BaseLabels implements Target.
func (t *target) BaseLabels() clientmodel.LabelSet {
// BaseLabels returns a copy of the target's base labels.
func (t *Target) BaseLabels() clientmodel.LabelSet {
t.RLock()
defer t.RUnlock()
lset := make(clientmodel.LabelSet, len(t.baseLabels))
@ -433,22 +400,26 @@ func (t *target) BaseLabels() clientmodel.LabelSet {
return lset
}
func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, scrapeDuration time.Duration) {
t.RLock()
healthMetric := make(clientmodel.Metric, len(t.baseLabels)+1)
durationMetric := make(clientmodel.Metric, len(t.baseLabels)+1)
func recordScrapeHealth(
sampleAppender storage.SampleAppender,
timestamp clientmodel.Timestamp,
baseLabels clientmodel.LabelSet,
health TargetHealth,
scrapeDuration time.Duration,
) {
healthMetric := make(clientmodel.Metric, len(baseLabels)+1)
durationMetric := make(clientmodel.Metric, len(baseLabels)+1)
healthMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeHealthMetricName)
durationMetric[clientmodel.MetricNameLabel] = clientmodel.LabelValue(scrapeDurationMetricName)
for label, value := range t.baseLabels {
for label, value := range baseLabels {
healthMetric[label] = value
durationMetric[label] = value
}
t.RUnlock()
healthValue := clientmodel.SampleValue(0)
if t.status.State() == Healthy {
if health == HealthGood {
healthValue = clientmodel.SampleValue(1)
}

View file

@ -29,10 +29,6 @@ import (
"github.com/prometheus/prometheus/utility"
)
func TestTargetInterface(t *testing.T) {
var _ Target = &target{}
}
func TestBaseLabels(t *testing.T) {
target := newTestTarget("example.com:80", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"})
want := clientmodel.LabelSet{
@ -50,8 +46,8 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
testTarget := newTestTarget("bad schema", 0, nil)
testTarget.scrape(nopAppender{})
if testTarget.status.State() != Unhealthy {
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.status.State())
if testTarget.status.Health() != HealthBad {
t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health())
}
}
@ -73,8 +69,8 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{"dings": "bums"})
testTarget.scrape(slowAppender{})
if testTarget.status.State() != Unhealthy {
t.Errorf("Expected target state %v, actual: %v", Unhealthy, testTarget.status.State())
if testTarget.status.Health() != HealthBad {
t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health())
}
if testTarget.status.LastError() != errIngestChannelFull {
t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.status.LastError())
@ -86,7 +82,8 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
now := clientmodel.Now()
appender := &collectResultAppender{}
testTarget.recordScrapeHealth(appender, now, 2*time.Second)
testTarget.status.setLastError(nil)
recordScrapeHealth(appender, now, testTarget.BaseLabels(), testTarget.status.Health(), 2*time.Second)
result := appender.result
@ -138,13 +135,13 @@ func TestTargetScrapeTimeout(t *testing.T) {
)
defer server.Close()
var testTarget Target = newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
testTarget := newTestTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
appender := nopAppender{}
// scrape once without timeout
signal <- true
if err := testTarget.(*target).scrape(appender); err != nil {
if err := testTarget.scrape(appender); err != nil {
t.Fatal(err)
}
@ -153,12 +150,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again
signal <- true
if err := testTarget.(*target).scrape(appender); err != nil {
if err := testTarget.scrape(appender); err != nil {
t.Fatal(err)
}
// now timeout
if err := testTarget.(*target).scrape(appender); err == nil {
if err := testTarget.scrape(appender); err == nil {
t.Fatal("expected scrape to timeout")
} else {
signal <- true // let handler continue
@ -166,7 +163,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again without timeout
signal <- true
if err := testTarget.(*target).scrape(appender); err != nil {
if err := testTarget.scrape(appender); err != nil {
t.Fatal(err)
}
}
@ -224,28 +221,26 @@ func BenchmarkScrape(b *testing.B) {
)
defer server.Close()
var testTarget Target = newTestTarget(server.URL, 100*time.Millisecond, clientmodel.LabelSet{"dings": "bums"})
testTarget := newTestTarget(server.URL, 100*time.Millisecond, clientmodel.LabelSet{"dings": "bums"})
appender := nopAppender{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := testTarget.(*target).scrape(appender); err != nil {
if err := testTarget.scrape(appender); err != nil {
b.Fatal(err)
}
}
}
func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmodel.LabelSet) *target {
t := &target{
func newTestTarget(targetURL string, deadline time.Duration, baseLabels clientmodel.LabelSet) *Target {
t := &Target{
url: &url.URL{
Scheme: "http",
Host: strings.TrimLeft(targetURL, "http://"),
Path: "/metrics",
},
deadline: deadline,
status: &TargetStatus{
state: Healthy,
},
deadline: deadline,
status: &TargetStatus{},
scrapeInterval: 1 * time.Millisecond,
httpClient: utility.NewDeadlineClient(deadline),
scraperStopping: make(chan struct{}),

View file

@ -56,7 +56,7 @@ type TargetManager struct {
running bool
// Targets by their source ID.
targets map[string][]Target
targets map[string][]*Target
// Providers by the scrape configs they are derived from.
providers map[*config.ScrapeConfig][]TargetProvider
}
@ -65,7 +65,7 @@ type TargetManager struct {
func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager {
tm := &TargetManager{
sampleAppender: sampleAppender,
targets: make(map[string][]Target),
targets: make(map[string][]*Target),
}
return tm
}
@ -165,7 +165,7 @@ func (tm *TargetManager) removeTargets(f func(string) bool) {
}
wg.Add(len(targets))
for _, target := range targets {
go func(t Target) {
go func(t *Target) {
t.StopScraper()
wg.Done()
}(target)
@ -197,7 +197,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
// Replace the old targets with the new ones while keeping the state
// of intersecting targets.
for i, tnew := range newTargets {
var match Target
var match *Target
for j, told := range oldTargets {
if told == nil {
continue
@ -214,7 +214,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
// Updating is blocked during a scrape. We don't want those wait times
// to build up.
wg.Add(1)
go func(t Target) {
go func(t *Target) {
match.Update(cfg, t.fullLabels())
wg.Done()
}(tnew)
@ -227,7 +227,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
for _, told := range oldTargets {
if told != nil {
wg.Add(1)
go func(t Target) {
go func(t *Target) {
t.StopScraper()
wg.Done()
}(told)
@ -250,11 +250,11 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
}
// Pools returns the targets currently being scraped bucketed by their job name.
func (tm *TargetManager) Pools() map[string][]Target {
func (tm *TargetManager) Pools() map[string][]*Target {
tm.m.RLock()
defer tm.m.RUnlock()
pools := map[string][]Target{}
pools := map[string][]*Target{}
for _, ts := range tm.targets {
for _, t := range ts {
@ -287,11 +287,11 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) {
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]Target, error) {
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
tm.m.RLock()
defer tm.m.RUnlock()
targets := make([]Target, 0, len(tg.Targets))
targets := make([]*Target, 0, len(tg.Targets))
for i, labels := range tg.Targets {
addr := string(labels[clientmodel.AddressLabel])
// If no port was provided, infer it based on the used scheme.

View file

@ -45,7 +45,7 @@ func TestTargetManagerChan(t *testing.T) {
providers: map[*config.ScrapeConfig][]TargetProvider{
testJob1: []TargetProvider{prov1},
},
targets: make(map[string][]Target),
targets: make(map[string][]*Target),
}
go targetManager.Run()
defer targetManager.Stop()

View file

@ -32,18 +32,18 @@ type PrometheusStatusHandler struct {
Flags map[string]string
RuleManager *rules.Manager
TargetPools func() map[string][]retrieval.Target
TargetPools func() map[string][]*retrieval.Target
Birth time.Time
PathPrefix string
}
// TargetStateToClass returns a map of TargetState to the name of a Bootstrap CSS class.
func (h *PrometheusStatusHandler) TargetStateToClass() map[retrieval.TargetState]string {
return map[retrieval.TargetState]string{
retrieval.Unknown: "warning",
retrieval.Healthy: "success",
retrieval.Unhealthy: "danger",
// TargetHealthToClass returns a map of TargetHealth to the name of a Bootstrap CSS class.
func (h *PrometheusStatusHandler) TargetHealthToClass() map[retrieval.TargetHealth]string {
return map[retrieval.TargetHealth]string{
retrieval.HealthUnknown: "warning",
retrieval.HealthBad: "success",
retrieval.HealthGood: "danger",
}
}

View file

@ -32,7 +32,6 @@
<h2>Targets</h2>
<table class="table table-condensed table-bordered table-striped table-hover">
{{$stateToClass := .TargetStateToClass}}
{{range $job, $pool := call .TargetPools}}
<thead>
<tr><th colspan="5" class="job_header">{{$job}}</th></tr>
@ -51,7 +50,7 @@
<a href="{{.URL | globalURL}}">{{.URL}}</a>
</td>
<td>
<span class="alert alert-{{index $stateToClass .Status.State}} target_status_alert">
<span class="alert alert-{{index .TargetHealthToClass .Status.State}} target_status_alert">
{{.Status.State}}
</span>
</td>

View file

@ -193,7 +193,7 @@ func getTemplate(name string, pathPrefix string) (*template.Template, error) {
file, err = getTemplateFile(name)
if err != nil {
glog.Error("Could not read template %d: ", name, err)
glog.Error("Could not read template %s: %s", name, err)
return nil, err
}
t, err = t.Parse(file)