Simplify update sync for targets, format config fixtures.

This commit is contained in:
Fabian Reinartz 2015-04-19 08:39:33 +02:00
parent fa42ed0e7e
commit 4f8673aa88
7 changed files with 79 additions and 110 deletions

View file

@ -109,6 +109,8 @@ const (
//
// Target implements extraction.Ingester.
type Target interface {
extraction.Ingester
// Return the last encountered scrape error, if any.
LastError() error
// Return the health of the target.
@ -129,18 +131,13 @@ type Target interface {
// Return the target's base labels without job and instance label. That's
// useful for display purposes.
BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet
// SetBaseLabelsFrom queues a replacement of the current base labels by
// the labels of the given target. The method returns immediately after
// queuing. The actual replacement of the base labels happens
// asynchronously (but most likely before the next scrape for the target
// begins).
// SetBaseLabelsFrom sets the target's base labels to the base labels
// of the provided target.
SetBaseLabelsFrom(Target)
// Scrape target at the specified interval.
RunScraper(storage.SampleAppender, time.Duration)
// Stop scraping, synchronous.
StopScraper()
// Ingest implements extraction.Ingester.
Ingest(clientmodel.Samples) error
}
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
@ -155,8 +152,6 @@ type target struct {
scraperStopping chan struct{}
// Closing scraperStopped signals that scraping has been stopped.
scraperStopped chan struct{}
// Channel to queue base labels to be replaced.
newBaseLabels chan clientmodel.LabelSet
// Channel to buffer ingested samples.
ingestedSamples chan clientmodel.Samples
@ -168,12 +163,8 @@ type target struct {
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
// Mutex protects lastError, lastScrape, state, and baseLabels. Writing
// the above must only happen in the goroutine running the RunScraper
// loop, and it must happen under the lock. In that way, no mutex lock
// is required for reading the above in the goroutine running the
// RunScraper loop, but only for reading in other goroutines.
sync.Mutex
// Mutex protects lastError, lastScrape, state, and baseLabels.
sync.RWMutex
}
// NewTarget creates a reasonably configured target for querying.
@ -184,7 +175,6 @@ func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelS
httpClient: utility.NewDeadlineClient(deadline),
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
}
t.baseLabels = clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())}
for baseLabel, baseValue := range baseLabels {
@ -213,18 +203,7 @@ func (t *target) Ingest(s clientmodel.Samples) error {
// RunScraper implements Target.
func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) {
defer func() {
// Need to drain t.newBaseLabels to not make senders block during shutdown.
for {
select {
case <-t.newBaseLabels:
// Do nothing.
default:
close(t.scraperStopped)
return
}
}
}()
defer close(t.scraperStopped)
jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64()))
select {
@ -245,31 +224,22 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time
// Explanation of the contraption below:
//
// In case t.newBaseLabels or t.scraperStopping have something to receive,
// we want to read from those channels rather than starting a new scrape
// (which might take very long). That's why the outer select has no
// ticker.C. Should neither t.newBaseLabels nor t.scraperStopping have
// anything to receive, we go into the inner select, where ticker.C is
// in the mix.
// In case t.scraperStopping has something to receive, we want to read
// from that channel rather than starting a new scrape (which might take very
// long). That's why the outer select has no ticker.C. Should t.scraperStopping
// not have anything to receive, we go into the inner select, where ticker.C
// is in the mix.
for {
select {
case newBaseLabels := <-t.newBaseLabels:
t.Lock() // Writing t.baseLabels requires the lock.
t.baseLabels = newBaseLabels
t.Unlock()
case <-t.scraperStopping:
return
default:
select {
case newBaseLabels := <-t.newBaseLabels:
t.Lock() // Writing t.baseLabels requires the lock.
t.baseLabels = newBaseLabels
t.Unlock()
case <-t.scraperStopping:
return
case <-ticker.C:
took := time.Since(t.lastScrape)
t.Lock() // Write t.lastScrape requires locking.
took := time.Since(t.lastScrape)
t.lastScrape = time.Now()
t.Unlock()
targetIntervalLength.WithLabelValues(interval.String()).Observe(
@ -290,8 +260,13 @@ func (t *target) StopScraper() {
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
t.RLock()
timestamp := clientmodel.Now()
defer func(start time.Time) {
t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start))
t.RUnlock()
t.Lock() // Writing t.state and t.lastError requires the lock.
if err == nil {
t.state = Healthy
@ -300,7 +275,6 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
}
t.lastError = err
t.Unlock()
t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start))
}(time.Now())
req, err := http.NewRequest("GET", t.URL(), nil)
@ -344,22 +318,22 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) {
// LastError implements Target.
func (t *target) LastError() error {
t.Lock()
defer t.Unlock()
t.RLock()
defer t.RUnlock()
return t.lastError
}
// State implements Target.
func (t *target) State() TargetState {
t.Lock()
defer t.Unlock()
t.RLock()
defer t.RUnlock()
return t.state
}
// LastScrape implements Target.
func (t *target) LastScrape() time.Time {
t.Lock()
defer t.Unlock()
t.RLock()
defer t.RUnlock()
return t.lastScrape
}
@ -406,8 +380,8 @@ func (t *target) GlobalURL() string {
// BaseLabels implements Target.
func (t *target) BaseLabels() clientmodel.LabelSet {
t.Lock()
defer t.Unlock()
t.RLock()
defer t.RUnlock()
return t.baseLabels
}
@ -427,7 +401,9 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) {
if t.URL() != newTarget.URL() {
panic("targets don't refer to the same endpoint")
}
t.newBaseLabels <- newTarget.BaseLabels()
t.Lock()
defer t.Unlock()
t.baseLabels = newTarget.BaseLabels()
}
func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) {

View file

@ -17,8 +17,6 @@ import (
"net/http"
"testing"
"time"
clientmodel "github.com/prometheus/client_golang/model"
)
func testTargetPool(t testing.TB) {
@ -85,7 +83,6 @@ func testTargetPool(t testing.TB) {
for _, input := range scenario.inputs {
target := target{
url: input.url,
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
pool.addTarget(&target)
@ -118,7 +115,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) {
state: Unhealthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
oldTarget2 := &target{
@ -126,7 +122,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) {
state: Unhealthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
newTarget1 := &target{
@ -134,7 +129,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) {
state: Healthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
newTarget2 := &target{
@ -142,7 +136,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) {
state: Healthy,
scraperStopping: make(chan struct{}),
scraperStopped: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}