mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Remove scrape config from Target.
This commit removes the scrapeConfig entirely from Target. All identity defining parameters are thus immutable now and the mutex can be removed.. Target identity is now correctly defined by the labels and the full URL. This in particular includes URL parameters that are not specified in the label set. Fingerprint is also removed from hash to remove an unnecessary tight coupling to the common/model package.
This commit is contained in:
parent
75681b691a
commit
0d7105abee
|
@ -82,9 +82,9 @@ type scrapePool struct {
|
||||||
config *config.ScrapeConfig
|
config *config.ScrapeConfig
|
||||||
client *http.Client
|
client *http.Client
|
||||||
// Targets and loops must always be synchronized to have the same
|
// Targets and loops must always be synchronized to have the same
|
||||||
// set of fingerprints.
|
// set of hashes.
|
||||||
targets map[model.Fingerprint]*Target
|
targets map[uint64]*Target
|
||||||
loops map[model.Fingerprint]loop
|
loops map[uint64]loop
|
||||||
|
|
||||||
// Constructor for new scrape loops. This is settable for testing convenience.
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
||||||
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
|
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
|
||||||
|
@ -100,8 +100,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape
|
||||||
appender: app,
|
appender: app,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
client: client,
|
client: client,
|
||||||
targets: map[model.Fingerprint]*Target{},
|
targets: map[uint64]*Target{},
|
||||||
loops: map[model.Fingerprint]loop{},
|
loops: map[uint64]loop{},
|
||||||
newLoop: newScrapeLoop,
|
newLoop: newScrapeLoop,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -178,21 +178,21 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
defer sp.mtx.Unlock()
|
defer sp.mtx.Unlock()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
fingerprints = map[model.Fingerprint]struct{}{}
|
uniqueTargets = map[uint64]struct{}{}
|
||||||
interval = time.Duration(sp.config.ScrapeInterval)
|
interval = time.Duration(sp.config.ScrapeInterval)
|
||||||
timeout = time.Duration(sp.config.ScrapeTimeout)
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, t := range targets {
|
for _, t := range targets {
|
||||||
fp := t.fingerprint()
|
hash := t.hash()
|
||||||
fingerprints[fp] = struct{}{}
|
uniqueTargets[hash] = struct{}{}
|
||||||
|
|
||||||
if _, ok := sp.targets[fp]; !ok {
|
if _, ok := sp.targets[hash]; !ok {
|
||||||
s := &targetScraper{Target: t, client: sp.client}
|
s := &targetScraper{Target: t, client: sp.client}
|
||||||
l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
|
l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
|
||||||
|
|
||||||
sp.targets[fp] = t
|
sp.targets[hash] = t
|
||||||
sp.loops[fp] = l
|
sp.loops[hash] = l
|
||||||
|
|
||||||
go l.run(interval, timeout, nil)
|
go l.run(interval, timeout, nil)
|
||||||
}
|
}
|
||||||
|
@ -201,16 +201,16 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
// Stop and remove old targets and scraper loops.
|
// Stop and remove old targets and scraper loops.
|
||||||
for fp := range sp.targets {
|
for hash := range sp.targets {
|
||||||
if _, ok := fingerprints[fp]; !ok {
|
if _, ok := uniqueTargets[hash]; !ok {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(l loop) {
|
go func(l loop) {
|
||||||
l.stop()
|
l.stop()
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(sp.loops[fp])
|
}(sp.loops[hash])
|
||||||
|
|
||||||
delete(sp.loops, fp)
|
delete(sp.loops, hash)
|
||||||
delete(sp.targets, fp)
|
delete(sp.targets, hash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,11 +60,11 @@ func (l *testLoop) stop() {
|
||||||
|
|
||||||
func TestScrapePoolStop(t *testing.T) {
|
func TestScrapePoolStop(t *testing.T) {
|
||||||
sp := &scrapePool{
|
sp := &scrapePool{
|
||||||
targets: map[model.Fingerprint]*Target{},
|
targets: map[uint64]*Target{},
|
||||||
loops: map[model.Fingerprint]loop{},
|
loops: map[uint64]loop{},
|
||||||
}
|
}
|
||||||
var mtx sync.Mutex
|
var mtx sync.Mutex
|
||||||
stopped := map[model.Fingerprint]bool{}
|
stopped := map[uint64]bool{}
|
||||||
numTargets := 20
|
numTargets := 20
|
||||||
|
|
||||||
// Stopping the scrape pool must call stop() on all scrape loops,
|
// Stopping the scrape pool must call stop() on all scrape loops,
|
||||||
|
@ -82,12 +82,12 @@ func TestScrapePoolStop(t *testing.T) {
|
||||||
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
||||||
|
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
stopped[t.fingerprint()] = true
|
stopped[t.hash()] = true
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
sp.targets[t.fingerprint()] = t
|
sp.targets[t.hash()] = t
|
||||||
sp.loops[t.fingerprint()] = l
|
sp.loops[t.hash()] = l
|
||||||
}
|
}
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
@ -126,7 +126,7 @@ func TestScrapePoolReload(t *testing.T) {
|
||||||
var mtx sync.Mutex
|
var mtx sync.Mutex
|
||||||
numTargets := 20
|
numTargets := 20
|
||||||
|
|
||||||
stopped := map[model.Fingerprint]bool{}
|
stopped := map[uint64]bool{}
|
||||||
|
|
||||||
reloadCfg := &config.ScrapeConfig{
|
reloadCfg := &config.ScrapeConfig{
|
||||||
ScrapeInterval: model.Duration(3 * time.Second),
|
ScrapeInterval: model.Duration(3 * time.Second),
|
||||||
|
@ -144,7 +144,7 @@ func TestScrapePoolReload(t *testing.T) {
|
||||||
t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout)
|
t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout)
|
||||||
}
|
}
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
if !stopped[s.(*targetScraper).fingerprint()] {
|
if !stopped[s.(*targetScraper).hash()] {
|
||||||
t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper))
|
t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper))
|
||||||
}
|
}
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
|
@ -152,8 +152,8 @@ func TestScrapePoolReload(t *testing.T) {
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
sp := &scrapePool{
|
sp := &scrapePool{
|
||||||
targets: map[model.Fingerprint]*Target{},
|
targets: map[uint64]*Target{},
|
||||||
loops: map[model.Fingerprint]loop{},
|
loops: map[uint64]loop{},
|
||||||
newLoop: newLoop,
|
newLoop: newLoop,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,18 +172,18 @@ func TestScrapePoolReload(t *testing.T) {
|
||||||
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
||||||
|
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
stopped[t.fingerprint()] = true
|
stopped[t.hash()] = true
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
sp.targets[t.fingerprint()] = t
|
sp.targets[t.hash()] = t
|
||||||
sp.loops[t.fingerprint()] = l
|
sp.loops[t.hash()] = l
|
||||||
}
|
}
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|
||||||
beforeTargets := map[model.Fingerprint]*Target{}
|
beforeTargets := map[uint64]*Target{}
|
||||||
for fp, t := range sp.targets {
|
for h, t := range sp.targets {
|
||||||
beforeTargets[fp] = t
|
beforeTargets[h] = t
|
||||||
}
|
}
|
||||||
|
|
||||||
reloadTime := time.Now()
|
reloadTime := time.Now()
|
||||||
|
|
|
@ -15,6 +15,7 @@ package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -117,24 +118,21 @@ type Target struct {
|
||||||
// The status object for the target. It is only set once on initialization.
|
// The status object for the target. It is only set once on initialization.
|
||||||
status *TargetStatus
|
status *TargetStatus
|
||||||
|
|
||||||
scrapeConfig *config.ScrapeConfig
|
|
||||||
|
|
||||||
// Mutex protects the members below.
|
|
||||||
sync.RWMutex
|
|
||||||
|
|
||||||
// Labels before any processing.
|
// Labels before any processing.
|
||||||
metaLabels model.LabelSet
|
metaLabels model.LabelSet
|
||||||
// Any labels that are added to this target and its metrics.
|
// Any labels that are added to this target and its metrics.
|
||||||
labels model.LabelSet
|
labels model.LabelSet
|
||||||
|
// Additional URL parmeters that are part of the target URL.
|
||||||
|
params url.Values
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTarget creates a reasonably configured target for querying.
|
// NewTarget creates a reasonably configured target for querying.
|
||||||
func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) *Target {
|
func NewTarget(labels, metaLabels model.LabelSet, params url.Values) *Target {
|
||||||
return &Target{
|
return &Target{
|
||||||
status: &TargetStatus{},
|
status: &TargetStatus{},
|
||||||
scrapeConfig: cfg,
|
labels: labels,
|
||||||
labels: labels,
|
metaLabels: metaLabels,
|
||||||
metaLabels: metaLabels,
|
params: params,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,15 +186,16 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Target) String() string {
|
func (t *Target) String() string {
|
||||||
return t.host()
|
return t.URL().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// fingerprint returns an identifying hash for the target.
|
// hash returns an identifying hash for the target.
|
||||||
func (t *Target) fingerprint() model.Fingerprint {
|
func (t *Target) hash() uint64 {
|
||||||
t.RLock()
|
h := fnv.New64a()
|
||||||
defer t.RUnlock()
|
h.Write([]byte(t.labels.Fingerprint().String()))
|
||||||
|
h.Write([]byte(t.URL().String()))
|
||||||
|
|
||||||
return t.labels.Fingerprint()
|
return h.Sum64()
|
||||||
}
|
}
|
||||||
|
|
||||||
// offset returns the time until the next scrape cycle for the target.
|
// offset returns the time until the next scrape cycle for the target.
|
||||||
|
@ -205,7 +204,7 @@ func (t *Target) offset(interval time.Duration) time.Duration {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
base = now % int64(interval)
|
base = now % int64(interval)
|
||||||
offset = uint64(t.fingerprint()) % uint64(interval)
|
offset = t.hash() % uint64(interval)
|
||||||
next = base + int64(offset)
|
next = base + int64(offset)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -215,35 +214,27 @@ func (t *Target) offset(interval time.Duration) time.Duration {
|
||||||
return time.Duration(next)
|
return time.Duration(next)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Target) scheme() string {
|
// Labels returns a copy of the set of all public labels of the target.
|
||||||
t.RLock()
|
func (t *Target) Labels() model.LabelSet {
|
||||||
defer t.RUnlock()
|
lset := make(model.LabelSet, len(t.labels))
|
||||||
|
for ln, lv := range t.labels {
|
||||||
return string(t.labels[model.SchemeLabel])
|
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
|
||||||
|
lset[ln] = lv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lset
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Target) host() string {
|
// MetaLabels returns a copy of the target's labels before any processing.
|
||||||
t.RLock()
|
func (t *Target) MetaLabels() model.LabelSet {
|
||||||
defer t.RUnlock()
|
return t.metaLabels.Clone()
|
||||||
|
|
||||||
return string(t.labels[model.AddressLabel])
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Target) path() string {
|
|
||||||
t.RLock()
|
|
||||||
defer t.RUnlock()
|
|
||||||
|
|
||||||
return string(t.labels[model.MetricsPathLabel])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// URL returns a copy of the target's URL.
|
// URL returns a copy of the target's URL.
|
||||||
func (t *Target) URL() *url.URL {
|
func (t *Target) URL() *url.URL {
|
||||||
t.RLock()
|
|
||||||
defer t.RUnlock()
|
|
||||||
|
|
||||||
params := url.Values{}
|
params := url.Values{}
|
||||||
|
|
||||||
for k, v := range t.scrapeConfig.Params {
|
for k, v := range t.params {
|
||||||
params[k] = make([]string, len(v))
|
params[k] = make([]string, len(v))
|
||||||
copy(params[k], v)
|
copy(params[k], v)
|
||||||
}
|
}
|
||||||
|
@ -329,36 +320,3 @@ func (app relabelAppender) Append(s *model.Sample) error {
|
||||||
|
|
||||||
return app.SampleAppender.Append(s)
|
return app.SampleAppender.Append(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Labels returns a copy of the set of all public labels of the target.
|
|
||||||
func (t *Target) Labels() model.LabelSet {
|
|
||||||
t.RLock()
|
|
||||||
defer t.RUnlock()
|
|
||||||
|
|
||||||
return t.unlockedLabels()
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlockedLabels does the same as Labels but does not lock the mutex (useful
|
|
||||||
// for internal usage when the mutex is already locked).
|
|
||||||
func (t *Target) unlockedLabels() model.LabelSet {
|
|
||||||
lset := make(model.LabelSet, len(t.labels))
|
|
||||||
for ln, lv := range t.labels {
|
|
||||||
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) {
|
|
||||||
lset[ln] = lv
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, ok := lset[model.InstanceLabel]; !ok {
|
|
||||||
lset[model.InstanceLabel] = t.labels[model.AddressLabel]
|
|
||||||
}
|
|
||||||
|
|
||||||
return lset
|
|
||||||
}
|
|
||||||
|
|
||||||
// MetaLabels returns a copy of the target's labels before any processing.
|
|
||||||
func (t *Target) MetaLabels() model.LabelSet {
|
|
||||||
t.RLock()
|
|
||||||
defer t.RUnlock()
|
|
||||||
|
|
||||||
return t.metaLabels.Clone()
|
|
||||||
}
|
|
||||||
|
|
|
@ -34,9 +34,8 @@ import (
|
||||||
func TestTargetLabels(t *testing.T) {
|
func TestTargetLabels(t *testing.T) {
|
||||||
target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"})
|
target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"})
|
||||||
want := model.LabelSet{
|
want := model.LabelSet{
|
||||||
model.JobLabel: "some_job",
|
model.JobLabel: "some_job",
|
||||||
model.InstanceLabel: "example.com:80",
|
"foo": "bar",
|
||||||
"foo": "bar",
|
|
||||||
}
|
}
|
||||||
got := target.Labels()
|
got := target.Labels()
|
||||||
if !reflect.DeepEqual(want, got) {
|
if !reflect.DeepEqual(want, got) {
|
||||||
|
@ -142,10 +141,6 @@ func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelS
|
||||||
labels[model.MetricsPathLabel] = "/metrics"
|
labels[model.MetricsPathLabel] = "/metrics"
|
||||||
|
|
||||||
return &Target{
|
return &Target{
|
||||||
scrapeConfig: &config.ScrapeConfig{
|
|
||||||
ScrapeInterval: model.Duration(time.Millisecond),
|
|
||||||
ScrapeTimeout: model.Duration(deadline),
|
|
||||||
},
|
|
||||||
labels: labels,
|
labels: labels,
|
||||||
status: &TargetStatus{},
|
status: &TargetStatus{},
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,7 +176,7 @@ type targetSet struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
// Sets of targets by a source string that is unique across target providers.
|
// Sets of targets by a source string that is unique across target providers.
|
||||||
tgroups map[string]map[model.Fingerprint]*Target
|
tgroups map[string]map[uint64]*Target
|
||||||
providers map[string]TargetProvider
|
providers map[string]TargetProvider
|
||||||
|
|
||||||
scrapePool *scrapePool
|
scrapePool *scrapePool
|
||||||
|
@ -189,7 +189,7 @@ type targetSet struct {
|
||||||
|
|
||||||
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
|
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
|
||||||
ts := &targetSet{
|
ts := &targetSet{
|
||||||
tgroups: map[string]map[model.Fingerprint]*Target{},
|
tgroups: map[string]map[uint64]*Target{},
|
||||||
scrapePool: newScrapePool(cfg, app),
|
scrapePool: newScrapePool(cfg, app),
|
||||||
syncCh: make(chan struct{}, 1),
|
syncCh: make(chan struct{}, 1),
|
||||||
config: cfg,
|
config: cfg,
|
||||||
|
@ -394,8 +394,8 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
// targetsFromGroup builds targets based on the given TargetGroup and config.
|
// targetsFromGroup builds targets based on the given TargetGroup and config.
|
||||||
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[model.Fingerprint]*Target, error) {
|
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[uint64]*Target, error) {
|
||||||
targets := make(map[model.Fingerprint]*Target, len(tg.Targets))
|
targets := make(map[uint64]*Target, len(tg.Targets))
|
||||||
|
|
||||||
for i, labels := range tg.Targets {
|
for i, labels := range tg.Targets {
|
||||||
for k, v := range cfg.Params {
|
for k, v := range cfg.Params {
|
||||||
|
@ -460,8 +460,12 @@ func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) (map[mod
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tr := NewTarget(cfg, labels, preRelabelLabels)
|
if _, ok := labels[model.InstanceLabel]; !ok {
|
||||||
targets[tr.fingerprint()] = tr
|
labels[model.InstanceLabel] = labels[model.AddressLabel]
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := NewTarget(labels, preRelabelLabels, cfg.Params)
|
||||||
|
targets[tr.hash()] = tr
|
||||||
}
|
}
|
||||||
|
|
||||||
return targets, nil
|
return targets, nil
|
||||||
|
|
Loading…
Reference in a new issue