diff --git a/retrieval/deadline_client.go b/retrieval/deadline_client.go new file mode 100644 index 000000000..5bed4a833 --- /dev/null +++ b/retrieval/deadline_client.go @@ -0,0 +1,30 @@ +package retrieval + +import ( + "net" + "net/http" + "time" +) + +// NewDeadlineClient returns a new http.Client which will time out long running +// requests. +func NewDeadlineClient(timeout time.Duration) http.Client { + return http.Client{ + Transport: &http.Transport{ + // We need to disable keepalive, becasue we set a deadline on the + // underlying connection. + DisableKeepAlives: true, + Dial: func(netw, addr string) (c net.Conn, err error) { + start := time.Now() + + c, err = net.DialTimeout(netw, addr, timeout) + + if err == nil { + c.SetDeadline(start.Add(timeout)) + } + + return + }, + }, + } +} diff --git a/retrieval/target.go b/retrieval/target.go index 5a40c2ecb..47d864c35 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -117,6 +117,7 @@ type target struct { Deadline time.Duration // Any base labels that are added to this target and its metrics. baseLabels model.LabelSet + client http.Client } // Furnish a reasonably configured target for querying. @@ -125,6 +126,7 @@ func NewTarget(address string, deadline time.Duration, baseLabels model.LabelSet address: address, Deadline: deadline, baseLabels: baseLabels, + client: NewDeadlineClient(deadline), } scheduler := &healthScheduler{ @@ -162,77 +164,55 @@ func (t *target) recordScrapeHealth(results chan format.Result, timestamp time.T func (t *target) Scrape(earliest time.Time, results chan format.Result) (err error) { now := time.Now() + futureState := t.state - defer func() { - futureState := t.state - - switch err { - case nil: - t.recordScrapeHealth(results, now, true) - futureState = ALIVE - default: - t.recordScrapeHealth(results, now, false) - futureState = UNREACHABLE - } - - t.scheduler.Reschedule(earliest, futureState) - t.state = futureState - }() - - done := make(chan bool) - - go func(start time.Time) { - defer func() { - ms := float64(time.Since(start)) / float64(time.Millisecond) - labels := map[string]string{address: t.Address(), outcome: success} - if err != nil { - labels[outcome] = failure - } - - targetOperationLatencies.Add(labels, ms) - targetOperations.Increment(labels) - }() - - defer func() { - done <- true - }() - - var resp *http.Response // Don't shadow "err" from the enclosing function. - resp, err = http.Get(t.Address()) - if err != nil { - return - } - - defer resp.Body.Close() - - processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header) - if err != nil { - return - } - - // XXX: This is a wart; we need to handle this more gracefully down the - // road, especially once we have service discovery support. - baseLabels := model.LabelSet{model.InstanceLabel: model.LabelValue(t.Address())} - for baseLabel, baseValue := range t.baseLabels { - baseLabels[baseLabel] = baseValue - } - - err = processor.Process(resp.Body, now, baseLabels, results) - if err != nil { - return - } - }(time.Now()) - - select { - case <-done: - break - case <-time.After(t.Deadline): - err = fmt.Errorf("Target %s exceeded %s deadline.", t, t.Deadline) + if err = t.scrape(now, results); err != nil { + t.recordScrapeHealth(results, now, false) + futureState = UNREACHABLE + } else { + t.recordScrapeHealth(results, now, true) + futureState = ALIVE } + t.scheduler.Reschedule(earliest, futureState) + t.state = futureState + return } +func (t *target) scrape(timestamp time.Time, results chan format.Result) (err error) { + defer func(start time.Time) { + ms := float64(time.Since(start)) / float64(time.Millisecond) + labels := map[string]string{address: t.Address(), outcome: success} + if err != nil { + labels[outcome] = failure + } + + targetOperationLatencies.Add(labels, ms) + targetOperations.Increment(labels) + }(time.Now()) + + resp, err := t.client.Get(t.Address()) + if err != nil { + return + } + defer resp.Body.Close() + + processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header) + if err != nil { + return + } + + // XXX: This is a wart; we need to handle this more gracefully down the + // road, especially once we have service discovery support. + baseLabels := model.LabelSet{model.InstanceLabel: model.LabelValue(t.Address())} + for baseLabel, baseValue := range t.baseLabels { + baseLabels[baseLabel] = baseValue + } + + return processor.Process(resp.Body, timestamp, baseLabels, results) +} + func (t target) State() TargetState { return t.state } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 8c8562f9a..d6b6a675d 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -16,6 +16,8 @@ package retrieval import ( "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/retrieval/format" + "net/http" + "net/http/httptest" "testing" "time" ) @@ -63,3 +65,45 @@ func TestTargetRecordScrapeHealth(t *testing.T) { t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual) } } + +func TestTargetScrapeTimeout(t *testing.T) { + signal := make(chan bool, 1) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-signal + w.Header().Set("X-Prometheus-API-Version", "0.0.1") + w.Write([]byte(`[]`)) + })) + + defer server.Close() + + testTarget := NewTarget(server.URL, 10*time.Millisecond, model.LabelSet{}) + results := make(chan format.Result, 1024) + + // scrape once without timeout + signal <- true + if err := testTarget.Scrape(time.Now(), results); err != nil { + t.Fatal(err) + } + + // let the deadline lapse + time.Sleep(15*time.Millisecond) + + // now scrape again + signal <- true + if err := testTarget.Scrape(time.Now(), results); err != nil { + t.Fatal(err) + } + + // now timeout + if err := testTarget.Scrape(time.Now(), results); err == nil { + t.Fatal("expected scrape to timeout") + } else { + signal <- true // let handler continue + } + + // now scrape again without timeout + signal <- true + if err := testTarget.Scrape(time.Now(), results); err != nil { + t.Fatal(err) + } +}