From 4f8673aa883438f3283627be12b6b781c0436f60 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 19 Apr 2015 08:39:33 +0200 Subject: [PATCH] Simplify update sync for targets, format config fixtures. --- config/fixtures/invalid_label_name.conf.input | 8 +- .../fixtures/invalid_proto_format.conf.input | 8 +- .../invalid_scrape_interval.conf.input | 8 +- config/fixtures/minimal.conf.input | 18 ++-- config/fixtures/sample.conf.input | 54 ++++++------ retrieval/target.go | 82 +++++++------------ retrieval/targetpool_test.go | 11 +-- 7 files changed, 79 insertions(+), 110 deletions(-) diff --git a/config/fixtures/invalid_label_name.conf.input b/config/fixtures/invalid_label_name.conf.input index bfc384bd0..f85538649 100644 --- a/config/fixtures/invalid_label_name.conf.input +++ b/config/fixtures/invalid_label_name.conf.input @@ -1,10 +1,10 @@ global < - scrape_interval: "30s" - evaluation_interval: "30s" - labels: < + scrape_interval: "30s" + evaluation_interval: "30s" + labels: < label: < name: "monitor-test" value: "test" > - > + > > diff --git a/config/fixtures/invalid_proto_format.conf.input b/config/fixtures/invalid_proto_format.conf.input index 9d15ddec1..ba311005b 100644 --- a/config/fixtures/invalid_proto_format.conf.input +++ b/config/fixtures/invalid_proto_format.conf.input @@ -1,11 +1,11 @@ global < - scrape_interval: "30s" - evaluation_interval: "30s" + scrape_interval: "30s" + evaluation_interval: "30s" unknown_field: "foo" - labels: < + labels: < label: < name: "monitor" value: "test" > - > + > > diff --git a/config/fixtures/invalid_scrape_interval.conf.input b/config/fixtures/invalid_scrape_interval.conf.input index e9274948a..537d50996 100644 --- a/config/fixtures/invalid_scrape_interval.conf.input +++ b/config/fixtures/invalid_scrape_interval.conf.input @@ -1,10 +1,10 @@ global < - scrape_interval: "30" - evaluation_interval: "30s" - labels: < + scrape_interval: "30" + evaluation_interval: "30s" + labels: < label: < name: "monitor" value: "test" > - > + > > diff --git a/config/fixtures/minimal.conf.input b/config/fixtures/minimal.conf.input index 135c316a7..c08f51009 100644 --- a/config/fixtures/minimal.conf.input +++ b/config/fixtures/minimal.conf.input @@ -1,20 +1,20 @@ global < - scrape_interval: "30s" - evaluation_interval: "30s" - labels: < + scrape_interval: "30s" + evaluation_interval: "30s" + labels: < label: < name: "monitor" value: "test" > - > - rule_file: "prometheus.rules" + > + rule_file: "prometheus.rules" > job: < - name: "prometheus" - scrape_interval: "15s" + name: "prometheus" + scrape_interval: "15s" - target_group: < - target: "http://localhost:9090/metrics.json" + target_group: < + target: "http://localhost:9090/metrics.json" > > diff --git a/config/fixtures/sample.conf.input b/config/fixtures/sample.conf.input index 5c03d674d..1a4ec17bf 100644 --- a/config/fixtures/sample.conf.input +++ b/config/fixtures/sample.conf.input @@ -1,55 +1,55 @@ global < - scrape_interval: "30s" - evaluation_interval: "30s" - labels: < + scrape_interval: "30s" + evaluation_interval: "30s" + labels: < label: < name: "monitor" value: "test" > - > - rule_file: "prometheus.rules" + > + rule_file: "prometheus.rules" > job: < - name: "prometheus" - scrape_interval: "15s" + name: "prometheus" + scrape_interval: "15s" - target_group: < - target: "http://localhost:9090/metrics.json" - labels: < + target_group: < + target: "http://localhost:9090/metrics.json" + labels: < label: < name: "group" value: "canary" > - > - > + > + > > job: < - name: "random" - scrape_interval: "30s" + name: "random" + scrape_interval: "30s" - target_group: < - target: "http://random.com:8080/metrics.json" + target_group: < + target: "http://random.com:8080/metrics.json" target: "http://random.com:8081/metrics.json" - target: "http://random.com:8082/metrics.json" - target: "http://random.com:8083/metrics.json" - target: "http://random.com:8084/metrics.json" - labels: < + target: "http://random.com:8082/metrics.json" + target: "http://random.com:8083/metrics.json" + target: "http://random.com:8084/metrics.json" + labels: < label: < name: "group" value: "production" > - > - > - target_group: < - target: "http://random.com:8085/metrics.json" + > + > + target_group: < + target: "http://random.com:8085/metrics.json" target: "http://random.com:8086/metrics.json" - labels: < + labels: < label: < name: "group" value: "canary" > - > - > + > + > > diff --git a/retrieval/target.go b/retrieval/target.go index 6cdfedf07..dba7f0b8e 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -109,6 +109,8 @@ const ( // // Target implements extraction.Ingester. type Target interface { + extraction.Ingester + // Return the last encountered scrape error, if any. LastError() error // Return the health of the target. @@ -129,18 +131,13 @@ type Target interface { // Return the target's base labels without job and instance label. That's // useful for display purposes. BaseLabelsWithoutJobAndInstance() clientmodel.LabelSet - // SetBaseLabelsFrom queues a replacement of the current base labels by - // the labels of the given target. The method returns immediately after - // queuing. The actual replacement of the base labels happens - // asynchronously (but most likely before the next scrape for the target - // begins). + // SetBaseLabelsFrom sets the target's base labels to the base labels + // of the provided target. SetBaseLabelsFrom(Target) // Scrape target at the specified interval. RunScraper(storage.SampleAppender, time.Duration) // Stop scraping, synchronous. StopScraper() - // Ingest implements extraction.Ingester. - Ingest(clientmodel.Samples) error } // target is a Target that refers to a singular HTTP or HTTPS endpoint. @@ -155,8 +152,6 @@ type target struct { scraperStopping chan struct{} // Closing scraperStopped signals that scraping has been stopped. scraperStopped chan struct{} - // Channel to queue base labels to be replaced. - newBaseLabels chan clientmodel.LabelSet // Channel to buffer ingested samples. ingestedSamples chan clientmodel.Samples @@ -168,12 +163,8 @@ type target struct { // The HTTP client used to scrape the target's endpoint. httpClient *http.Client - // Mutex protects lastError, lastScrape, state, and baseLabels. Writing - // the above must only happen in the goroutine running the RunScraper - // loop, and it must happen under the lock. In that way, no mutex lock - // is required for reading the above in the goroutine running the - // RunScraper loop, but only for reading in other goroutines. - sync.Mutex + // Mutex protects lastError, lastScrape, state, and baseLabels. + sync.RWMutex } // NewTarget creates a reasonably configured target for querying. @@ -184,7 +175,6 @@ func NewTarget(url string, deadline time.Duration, baseLabels clientmodel.LabelS httpClient: utility.NewDeadlineClient(deadline), scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), } t.baseLabels = clientmodel.LabelSet{InstanceLabel: clientmodel.LabelValue(t.InstanceIdentifier())} for baseLabel, baseValue := range baseLabels { @@ -213,18 +203,7 @@ func (t *target) Ingest(s clientmodel.Samples) error { // RunScraper implements Target. func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time.Duration) { - defer func() { - // Need to drain t.newBaseLabels to not make senders block during shutdown. - for { - select { - case <-t.newBaseLabels: - // Do nothing. - default: - close(t.scraperStopped) - return - } - } - }() + defer close(t.scraperStopped) jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64())) select { @@ -245,31 +224,22 @@ func (t *target) RunScraper(sampleAppender storage.SampleAppender, interval time // Explanation of the contraption below: // - // In case t.newBaseLabels or t.scraperStopping have something to receive, - // we want to read from those channels rather than starting a new scrape - // (which might take very long). That's why the outer select has no - // ticker.C. Should neither t.newBaseLabels nor t.scraperStopping have - // anything to receive, we go into the inner select, where ticker.C is - // in the mix. + // In case t.scraperStopping has something to receive, we want to read + // from that channel rather than starting a new scrape (which might take very + // long). That's why the outer select has no ticker.C. Should t.scraperStopping + // not have anything to receive, we go into the inner select, where ticker.C + // is in the mix. for { select { - case newBaseLabels := <-t.newBaseLabels: - t.Lock() // Writing t.baseLabels requires the lock. - t.baseLabels = newBaseLabels - t.Unlock() case <-t.scraperStopping: return default: select { - case newBaseLabels := <-t.newBaseLabels: - t.Lock() // Writing t.baseLabels requires the lock. - t.baseLabels = newBaseLabels - t.Unlock() case <-t.scraperStopping: return case <-ticker.C: - took := time.Since(t.lastScrape) t.Lock() // Write t.lastScrape requires locking. + took := time.Since(t.lastScrape) t.lastScrape = time.Now() t.Unlock() targetIntervalLength.WithLabelValues(interval.String()).Observe( @@ -290,8 +260,13 @@ func (t *target) StopScraper() { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { + t.RLock() timestamp := clientmodel.Now() + defer func(start time.Time) { + t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start)) + t.RUnlock() + t.Lock() // Writing t.state and t.lastError requires the lock. if err == nil { t.state = Healthy @@ -300,7 +275,6 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { } t.lastError = err t.Unlock() - t.recordScrapeHealth(sampleAppender, timestamp, err == nil, time.Since(start)) }(time.Now()) req, err := http.NewRequest("GET", t.URL(), nil) @@ -344,22 +318,22 @@ func (t *target) scrape(sampleAppender storage.SampleAppender) (err error) { // LastError implements Target. func (t *target) LastError() error { - t.Lock() - defer t.Unlock() + t.RLock() + defer t.RUnlock() return t.lastError } // State implements Target. func (t *target) State() TargetState { - t.Lock() - defer t.Unlock() + t.RLock() + defer t.RUnlock() return t.state } // LastScrape implements Target. func (t *target) LastScrape() time.Time { - t.Lock() - defer t.Unlock() + t.RLock() + defer t.RUnlock() return t.lastScrape } @@ -406,8 +380,8 @@ func (t *target) GlobalURL() string { // BaseLabels implements Target. func (t *target) BaseLabels() clientmodel.LabelSet { - t.Lock() - defer t.Unlock() + t.RLock() + defer t.RUnlock() return t.baseLabels } @@ -427,7 +401,9 @@ func (t *target) SetBaseLabelsFrom(newTarget Target) { if t.URL() != newTarget.URL() { panic("targets don't refer to the same endpoint") } - t.newBaseLabels <- newTarget.BaseLabels() + t.Lock() + defer t.Unlock() + t.baseLabels = newTarget.BaseLabels() } func (t *target) recordScrapeHealth(sampleAppender storage.SampleAppender, timestamp clientmodel.Timestamp, healthy bool, scrapeDuration time.Duration) { diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 2c9d3738e..56e9ea09b 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -17,8 +17,6 @@ import ( "net/http" "testing" "time" - - clientmodel "github.com/prometheus/client_golang/model" ) func testTargetPool(t testing.TB) { @@ -84,9 +82,8 @@ func testTargetPool(t testing.TB) { for _, input := range scenario.inputs { target := target{ - url: input.url, - newBaseLabels: make(chan clientmodel.LabelSet, 1), - httpClient: &http.Client{}, + url: input.url, + httpClient: &http.Client{}, } pool.addTarget(&target) } @@ -118,7 +115,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) { state: Unhealthy, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, } oldTarget2 := &target{ @@ -126,7 +122,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) { state: Unhealthy, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, } newTarget1 := &target{ @@ -134,7 +129,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) { state: Healthy, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, } newTarget2 := &target{ @@ -142,7 +136,6 @@ func TestTargetPoolReplaceTargets(t *testing.T) { state: Healthy, scraperStopping: make(chan struct{}), scraperStopped: make(chan struct{}), - newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, }