Extract HTTP client from Target.

The HTTP client is the same across all targets with the same
scrape configuration. Thus, this commit moves it into the scrape
pool.
This commit is contained in:
Fabian Reinartz 2016-02-28 19:21:50 +01:00
parent cf56e33030
commit 75681b691a
5 changed files with 126 additions and 149 deletions

View file

@ -15,13 +15,18 @@ package retrieval
import ( import (
"errors" "errors"
"fmt"
"io"
"net/http"
"sync" "sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -70,13 +75,14 @@ func init() {
// scrapePool manages scrapes for sets of targets. // scrapePool manages scrapes for sets of targets.
type scrapePool struct { type scrapePool struct {
appender storage.SampleAppender appender storage.SampleAppender
config *config.ScrapeConfig
ctx context.Context ctx context.Context
mtx sync.RWMutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same // Targets and loops must always be synchronized to have the same
// set of fingerprints. // set of fingerprints.
mtx sync.RWMutex
targets map[model.Fingerprint]*Target targets map[model.Fingerprint]*Target
loops map[model.Fingerprint]loop loops map[model.Fingerprint]loop
@ -85,9 +91,15 @@ type scrapePool struct {
} }
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
client, err := newHTTPClient(cfg)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
return &scrapePool{ return &scrapePool{
appender: app, appender: app,
config: cfg, config: cfg,
client: client,
targets: map[model.Fingerprint]*Target{}, targets: map[model.Fingerprint]*Target{},
loops: map[model.Fingerprint]loop{}, loops: map[model.Fingerprint]loop{},
newLoop: newScrapeLoop, newLoop: newScrapeLoop,
@ -123,7 +135,13 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
sp.mtx.Lock() sp.mtx.Lock()
defer sp.mtx.Unlock() defer sp.mtx.Unlock()
client, err := newHTTPClient(cfg)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
sp.config = cfg sp.config = cfg
sp.client = client
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
@ -134,7 +152,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
for fp, oldLoop := range sp.loops { for fp, oldLoop := range sp.loops {
var ( var (
t = sp.targets[fp] t = sp.targets[fp]
newLoop = sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) s = &targetScraper{Target: t, client: sp.client}
newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
) )
wg.Add(1) wg.Add(1)
@ -169,7 +188,8 @@ func (sp *scrapePool) sync(targets []*Target) {
fingerprints[fp] = struct{}{} fingerprints[fp] = struct{}{}
if _, ok := sp.targets[fp]; !ok { if _, ok := sp.targets[fp]; !ok {
l := sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t)) s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
sp.targets[fp] = t sp.targets[fp] = t
sp.loops[fp] = l sp.loops[fp] = l
@ -242,6 +262,57 @@ type scraper interface {
offset(interval time.Duration) time.Duration offset(interval time.Duration) time.Duration
} }
// targetScraper implements the scraper interface for a target.
type targetScraper struct {
*Target
client *http.Client
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", acceptHeader)
resp, err := ctxhttp.Do(ctx, s.client, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
}
var (
allSamples = make(model.Samples, 0, 200)
decSamples = make(model.Vector, 0, 50)
)
sdec := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
},
}
for {
if err = sdec.Decode(&decSamples); err != nil {
break
}
allSamples = append(allSamples, decSamples...)
decSamples = decSamples[:0]
}
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
}
return allSamples, err
}
// A loop can run and be stopped again. It must not be reused after it was stopped. // A loop can run and be stopped again. It must not be reused after it was stopped.
type loop interface { type loop interface {
run(interval, timeout time.Duration, errc chan<- error) run(interval, timeout time.Duration, errc chan<- error)

View file

@ -144,8 +144,8 @@ func TestScrapePoolReload(t *testing.T) {
t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout) t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout)
} }
mtx.Lock() mtx.Lock()
if !stopped[s.(*Target).fingerprint()] { if !stopped[s.(*targetScraper).fingerprint()] {
t.Errorf("Scrape loop for %v not stopped yet", s.(*Target)) t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper))
} }
mtx.Unlock() mtx.Unlock()
} }

View file

@ -15,7 +15,6 @@ package retrieval
import ( import (
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
@ -23,10 +22,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -121,7 +117,6 @@ type Target struct {
// The status object for the target. It is only set once on initialization. // The status object for the target. It is only set once on initialization.
status *TargetStatus status *TargetStatus
scrapeLoop *scrapeLoop
scrapeConfig *config.ScrapeConfig scrapeConfig *config.ScrapeConfig
// Mutex protects the members below. // Mutex protects the members below.
@ -131,25 +126,16 @@ type Target struct {
metaLabels model.LabelSet metaLabels model.LabelSet
// Any labels that are added to this target and its metrics. // Any labels that are added to this target and its metrics.
labels model.LabelSet labels model.LabelSet
// The HTTP client used to scrape the target's endpoint.
httpClient *http.Client
} }
// NewTarget creates a reasonably configured target for querying. // NewTarget creates a reasonably configured target for querying.
func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) { func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) *Target {
client, err := newHTTPClient(cfg) return &Target{
if err != nil {
return nil, err
}
t := &Target{
status: &TargetStatus{}, status: &TargetStatus{},
scrapeConfig: cfg, scrapeConfig: cfg,
labels: labels, labels: labels,
metaLabels: metaLabels, metaLabels: metaLabels,
httpClient: client,
} }
return t, nil
} }
// Status returns the status of the target. // Status returns the status of the target.
@ -282,60 +268,6 @@ func (t *Target) URL() *url.URL {
} }
} }
// InstanceIdentifier returns the identifier for the target.
func (t *Target) InstanceIdentifier() string {
return t.host()
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`
func (t *Target) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {
t.RLock()
client := t.httpClient
t.RUnlock()
req, err := http.NewRequest("GET", t.URL().String(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", acceptHeader)
resp, err := ctxhttp.Do(ctx, client, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
}
var (
allSamples = make(model.Samples, 0, 200)
decSamples = make(model.Vector, 0, 50)
)
sdec := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
},
}
for {
if err = sdec.Decode(&decSamples); err != nil {
break
}
allSamples = append(allSamples, decSamples...)
decSamples = decSamples[:0]
}
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
}
return allSamples, err
}
func (t *Target) report(start time.Time, dur time.Duration, err error) { func (t *Target) report(start time.Time, dur time.Duration, err error) {
t.status.setLastError(err) t.status.setLastError(err)
t.status.setLastScrape(start) t.status.setLastScrape(start)

View file

@ -16,19 +16,17 @@ package retrieval
import ( import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" // "net/url"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -92,71 +90,50 @@ func TestTargetOffset(t *testing.T) {
} }
} }
func TestTargetScrape404(t *testing.T) { // func TestTargetURLParams(t *testing.T) {
server := httptest.NewServer( // server := httptest.NewServer(
http.HandlerFunc( // http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) { // func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound) // w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
}, // w.Write([]byte{})
), // r.ParseForm()
) // if r.Form["foo"][0] != "bar" {
defer server.Close() // t.Fatalf("URL parameter 'foo' had unexpected first value '%v'", r.Form["foo"][0])
// }
// if r.Form["foo"][1] != "baz" {
// t.Fatalf("URL parameter 'foo' had unexpected second value '%v'", r.Form["foo"][1])
// }
// },
// ),
// )
// defer server.Close()
// serverURL, err := url.Parse(server.URL)
// if err != nil {
// t.Fatal(err)
// }
testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{}) // target, err := NewTarget(
// &config.ScrapeConfig{
want := errors.New("server returned HTTP status 404 Not Found") // JobName: "test_job1",
_, got := testTarget.scrape(context.Background(), time.Now()) // Scheme: "https",
if got == nil || want.Error() != got.Error() { // Params: url.Values{
t.Fatalf("want err %q, got %q", want, got) // "foo": []string{"bar", "baz"},
} // },
} // },
// model.LabelSet{
func TestURLParams(t *testing.T) { // model.SchemeLabel: model.LabelValue(serverURL.Scheme),
server := httptest.NewServer( // model.AddressLabel: model.LabelValue(serverURL.Host),
http.HandlerFunc( // "__param_foo": "bar_override",
func(w http.ResponseWriter, r *http.Request) { // },
w.Header().Set("Content-Type", `text/plain; version=0.0.4`) // nil,
w.Write([]byte{}) // )
r.ParseForm() // if err != nil {
if r.Form["foo"][0] != "bar" { // t.Fatal(err)
t.Fatalf("URL parameter 'foo' had unexpected first value '%v'", r.Form["foo"][0]) // }
} // if _, err = target.scrape(context.Background(), time.Now()); err != nil {
if r.Form["foo"][1] != "baz" { // t.Fatal(err)
t.Fatalf("URL parameter 'foo' had unexpected second value '%v'", r.Form["foo"][1]) // }
} // }
},
),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
if err != nil {
t.Fatal(err)
}
target, err := NewTarget(
&config.ScrapeConfig{
JobName: "test_job1",
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(1 * time.Second),
Scheme: serverURL.Scheme,
Params: url.Values{
"foo": []string{"bar", "baz"},
},
},
model.LabelSet{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
"__param_foo": "bar",
},
nil,
)
if err != nil {
t.Fatal(err)
}
if _, err = target.scrape(context.Background(), time.Now()); err != nil {
t.Fatal(err)
}
}
func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelSet) *Target { func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelSet) *Target {
labels = labels.Clone() labels = labels.Clone()
@ -343,7 +320,7 @@ func newTLSConfig(t *testing.T) *tls.Config {
return tlsConfig return tlsConfig
} }
func TestNewTargetWithBadTLSConfig(t *testing.T) { func TestNewClientWithBadTLSConfig(t *testing.T) {
cfg := &config.ScrapeConfig{ cfg := &config.ScrapeConfig{
ScrapeTimeout: model.Duration(1 * time.Second), ScrapeTimeout: model.Duration(1 * time.Second),
TLSConfig: config.TLSConfig{ TLSConfig: config.TLSConfig{
@ -352,7 +329,7 @@ func TestNewTargetWithBadTLSConfig(t *testing.T) {
KeyFile: "testdata/nonexistent_client.key", KeyFile: "testdata/nonexistent_client.key",
}, },
} }
_, err := NewTarget(cfg, nil, nil) _, err := newHTTPClient(cfg)
if err == nil { if err == nil {
t.Fatalf("Expected error, got nil.") t.Fatalf("Expected error, got nil.")
} }

View file

@ -459,11 +459,8 @@ func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[mod
delete(labels, ln) delete(labels, ln)
} }
} }
tr, err := NewTarget(cfg, labels, preRelabelLabels)
if err != nil {
return nil, fmt.Errorf("error while creating instance %d in target group %s: %s", i, tg, err)
}
tr := NewTarget(cfg, labels, preRelabelLabels)
targets[tr.fingerprint()] = tr targets[tr.fingerprint()] = tr
} }