mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Target uses HTTP transport with deadlines
Instead of externally handling timeouts when scraping a target, we set timeouts on the HTTP connection. This ensures that we don't leak goroutines on timeouts. [fixes #181]
This commit is contained in:
parent
d46cd089b5
commit
3929582892
30
retrieval/deadline_client.go
Normal file
30
retrieval/deadline_client.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package retrieval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NewDeadlineClient returns a new http.Client which will time out long running
|
||||
// requests.
|
||||
func NewDeadlineClient(timeout time.Duration) http.Client {
|
||||
return http.Client{
|
||||
Transport: &http.Transport{
|
||||
// We need to disable keepalive, becasue we set a deadline on the
|
||||
// underlying connection.
|
||||
DisableKeepAlives: true,
|
||||
Dial: func(netw, addr string) (c net.Conn, err error) {
|
||||
start := time.Now()
|
||||
|
||||
c, err = net.DialTimeout(netw, addr, timeout)
|
||||
|
||||
if err == nil {
|
||||
c.SetDeadline(start.Add(timeout))
|
||||
}
|
||||
|
||||
return
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
|
@ -117,6 +117,7 @@ type target struct {
|
|||
Deadline time.Duration
|
||||
// Any base labels that are added to this target and its metrics.
|
||||
baseLabels model.LabelSet
|
||||
client http.Client
|
||||
}
|
||||
|
||||
// Furnish a reasonably configured target for querying.
|
||||
|
@ -125,6 +126,7 @@ func NewTarget(address string, deadline time.Duration, baseLabels model.LabelSet
|
|||
address: address,
|
||||
Deadline: deadline,
|
||||
baseLabels: baseLabels,
|
||||
client: NewDeadlineClient(deadline),
|
||||
}
|
||||
|
||||
scheduler := &healthScheduler{
|
||||
|
@ -162,77 +164,55 @@ func (t *target) recordScrapeHealth(results chan format.Result, timestamp time.T
|
|||
|
||||
func (t *target) Scrape(earliest time.Time, results chan format.Result) (err error) {
|
||||
now := time.Now()
|
||||
futureState := t.state
|
||||
|
||||
defer func() {
|
||||
futureState := t.state
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
t.recordScrapeHealth(results, now, true)
|
||||
futureState = ALIVE
|
||||
default:
|
||||
t.recordScrapeHealth(results, now, false)
|
||||
futureState = UNREACHABLE
|
||||
}
|
||||
|
||||
t.scheduler.Reschedule(earliest, futureState)
|
||||
t.state = futureState
|
||||
}()
|
||||
|
||||
done := make(chan bool)
|
||||
|
||||
go func(start time.Time) {
|
||||
defer func() {
|
||||
ms := float64(time.Since(start)) / float64(time.Millisecond)
|
||||
labels := map[string]string{address: t.Address(), outcome: success}
|
||||
if err != nil {
|
||||
labels[outcome] = failure
|
||||
}
|
||||
|
||||
targetOperationLatencies.Add(labels, ms)
|
||||
targetOperations.Increment(labels)
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
var resp *http.Response // Don't shadow "err" from the enclosing function.
|
||||
resp, err = http.Get(t.Address())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// XXX: This is a wart; we need to handle this more gracefully down the
|
||||
// road, especially once we have service discovery support.
|
||||
baseLabels := model.LabelSet{model.InstanceLabel: model.LabelValue(t.Address())}
|
||||
for baseLabel, baseValue := range t.baseLabels {
|
||||
baseLabels[baseLabel] = baseValue
|
||||
}
|
||||
|
||||
err = processor.Process(resp.Body, now, baseLabels, results)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}(time.Now())
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
break
|
||||
case <-time.After(t.Deadline):
|
||||
err = fmt.Errorf("Target %s exceeded %s deadline.", t, t.Deadline)
|
||||
if err = t.scrape(now, results); err != nil {
|
||||
t.recordScrapeHealth(results, now, false)
|
||||
futureState = UNREACHABLE
|
||||
} else {
|
||||
t.recordScrapeHealth(results, now, true)
|
||||
futureState = ALIVE
|
||||
}
|
||||
|
||||
t.scheduler.Reschedule(earliest, futureState)
|
||||
t.state = futureState
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (t *target) scrape(timestamp time.Time, results chan format.Result) (err error) {
|
||||
defer func(start time.Time) {
|
||||
ms := float64(time.Since(start)) / float64(time.Millisecond)
|
||||
labels := map[string]string{address: t.Address(), outcome: success}
|
||||
if err != nil {
|
||||
labels[outcome] = failure
|
||||
}
|
||||
|
||||
targetOperationLatencies.Add(labels, ms)
|
||||
targetOperations.Increment(labels)
|
||||
}(time.Now())
|
||||
|
||||
resp, err := t.client.Get(t.Address())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// XXX: This is a wart; we need to handle this more gracefully down the
|
||||
// road, especially once we have service discovery support.
|
||||
baseLabels := model.LabelSet{model.InstanceLabel: model.LabelValue(t.Address())}
|
||||
for baseLabel, baseValue := range t.baseLabels {
|
||||
baseLabels[baseLabel] = baseValue
|
||||
}
|
||||
|
||||
return processor.Process(resp.Body, timestamp, baseLabels, results)
|
||||
}
|
||||
|
||||
func (t target) State() TargetState {
|
||||
return t.state
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@ package retrieval
|
|||
import (
|
||||
"github.com/prometheus/prometheus/model"
|
||||
"github.com/prometheus/prometheus/retrieval/format"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
@ -63,3 +65,45 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
|
|||
t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTargetScrapeTimeout(t *testing.T) {
|
||||
signal := make(chan bool, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
<-signal
|
||||
w.Header().Set("X-Prometheus-API-Version", "0.0.1")
|
||||
w.Write([]byte(`[]`))
|
||||
}))
|
||||
|
||||
defer server.Close()
|
||||
|
||||
testTarget := NewTarget(server.URL, 10*time.Millisecond, model.LabelSet{})
|
||||
results := make(chan format.Result, 1024)
|
||||
|
||||
// scrape once without timeout
|
||||
signal <- true
|
||||
if err := testTarget.Scrape(time.Now(), results); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// let the deadline lapse
|
||||
time.Sleep(15*time.Millisecond)
|
||||
|
||||
// now scrape again
|
||||
signal <- true
|
||||
if err := testTarget.Scrape(time.Now(), results); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// now timeout
|
||||
if err := testTarget.Scrape(time.Now(), results); err == nil {
|
||||
t.Fatal("expected scrape to timeout")
|
||||
} else {
|
||||
signal <- true // let handler continue
|
||||
}
|
||||
|
||||
// now scrape again without timeout
|
||||
signal <- true
|
||||
if err := testTarget.Scrape(time.Now(), results); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue