mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
Move appender construction from Target to scrapePool
This commit is contained in:
parent
fbe251c2df
commit
775316f8d2
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
)
|
)
|
||||||
|
@ -69,6 +70,7 @@ func init() {
|
||||||
// scrapePool manages scrapes for sets of targets.
|
// scrapePool manages scrapes for sets of targets.
|
||||||
type scrapePool struct {
|
type scrapePool struct {
|
||||||
appender storage.SampleAppender
|
appender storage.SampleAppender
|
||||||
|
config *config.ScrapeConfig
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
@ -77,9 +79,10 @@ type scrapePool struct {
|
||||||
targets map[model.Fingerprint]loop
|
targets map[model.Fingerprint]loop
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScrapePool(app storage.SampleAppender) *scrapePool {
|
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
|
||||||
return &scrapePool{
|
return &scrapePool{
|
||||||
appender: app,
|
appender: app,
|
||||||
|
config: cfg,
|
||||||
tgroups: map[string]map[model.Fingerprint]*Target{},
|
tgroups: map[string]map[model.Fingerprint]*Target{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -104,6 +107,40 @@ func (sp *scrapePool) stop() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sampleAppender returns an appender for ingested samples from the target.
|
||||||
|
func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender {
|
||||||
|
app := sp.appender
|
||||||
|
// The relabelAppender has to be inside the label-modifying appenders
|
||||||
|
// so the relabeling rules are applied to the correct label set.
|
||||||
|
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
|
||||||
|
app = relabelAppender{
|
||||||
|
SampleAppender: app,
|
||||||
|
relabelings: mrc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sp.config.HonorLabels {
|
||||||
|
app = honorLabelsAppender{
|
||||||
|
SampleAppender: app,
|
||||||
|
labels: target.Labels(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
app = ruleLabelsAppender{
|
||||||
|
SampleAppender: app,
|
||||||
|
labels: target.Labels(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|
||||||
|
// reportAppender returns an appender for reporting samples for the target.
|
||||||
|
func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
|
||||||
|
return ruleLabelsAppender{
|
||||||
|
SampleAppender: sp.appender,
|
||||||
|
labels: target.Labels(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) {
|
func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) {
|
||||||
sp.mtx.Lock()
|
sp.mtx.Lock()
|
||||||
|
|
||||||
|
@ -127,7 +164,7 @@ func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) {
|
||||||
} else {
|
} else {
|
||||||
newTargets[fp] = tnew
|
newTargets[fp] = tnew
|
||||||
|
|
||||||
tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, tnew.wrapAppender(sp.appender), tnew.wrapReportingAppender(sp.appender))
|
tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, sp.sampleAppender(tnew), sp.reportAppender(tnew))
|
||||||
go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout(), nil)
|
go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout(), nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,9 +20,86 @@ import (
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
// "github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestScrapePoolReportAppender(t *testing.T) {
|
||||||
|
cfg := &config.ScrapeConfig{
|
||||||
|
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||||
|
{}, {}, {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||||
|
app := &nopAppender{}
|
||||||
|
|
||||||
|
sp := newScrapePool(cfg, app)
|
||||||
|
|
||||||
|
cfg.HonorLabels = false
|
||||||
|
wrapped := sp.reportAppender(target)
|
||||||
|
|
||||||
|
rl, ok := wrapped.(ruleLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
if rl.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.HonorLabels = true
|
||||||
|
wrapped = sp.reportAppender(target)
|
||||||
|
|
||||||
|
hl, ok := wrapped.(ruleLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
if hl.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScrapePoolSampleAppender(t *testing.T) {
|
||||||
|
cfg := &config.ScrapeConfig{
|
||||||
|
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||||
|
{}, {}, {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||||
|
app := &nopAppender{}
|
||||||
|
|
||||||
|
sp := newScrapePool(cfg, app)
|
||||||
|
|
||||||
|
cfg.HonorLabels = false
|
||||||
|
wrapped := sp.sampleAppender(target)
|
||||||
|
|
||||||
|
rl, ok := wrapped.(ruleLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
re, ok := rl.SampleAppender.(relabelAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
|
||||||
|
}
|
||||||
|
if re.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.HonorLabels = true
|
||||||
|
wrapped = sp.sampleAppender(target)
|
||||||
|
|
||||||
|
hl, ok := wrapped.(honorLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
re, ok = hl.SampleAppender.(relabelAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
|
||||||
|
}
|
||||||
|
if re.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestScrapeLoopRun(t *testing.T) {
|
func TestScrapeLoopRun(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
signal = make(chan struct{})
|
signal = make(chan struct{})
|
||||||
|
|
|
@ -121,13 +121,12 @@ type Target struct {
|
||||||
// The status object for the target. It is only set once on initialization.
|
// The status object for the target. It is only set once on initialization.
|
||||||
status *TargetStatus
|
status *TargetStatus
|
||||||
|
|
||||||
scrapeLoop *scrapeLoop
|
scrapeLoop *scrapeLoop
|
||||||
|
scrapeConfig *config.ScrapeConfig
|
||||||
|
|
||||||
// Mutex protects the members below.
|
// Mutex protects the members below.
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
|
||||||
scrapeConfig *config.ScrapeConfig
|
|
||||||
|
|
||||||
// Labels before any processing.
|
// Labels before any processing.
|
||||||
metaLabels model.LabelSet
|
metaLabels model.LabelSet
|
||||||
// Any labels that are added to this target and its metrics.
|
// Any labels that are added to this target and its metrics.
|
||||||
|
@ -265,42 +264,6 @@ func (t *Target) path() string {
|
||||||
return string(t.labels[model.MetricsPathLabel])
|
return string(t.labels[model.MetricsPathLabel])
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapAppender wraps a SampleAppender for samples ingested from the target.
|
|
||||||
// RLock must be acquired by the caller.
|
|
||||||
func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender {
|
|
||||||
// The relabelAppender has to be inside the label-modifying appenders
|
|
||||||
// so the relabeling rules are applied to the correct label set.
|
|
||||||
if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 {
|
|
||||||
app = relabelAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
relabelings: mrc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if t.scrapeConfig.HonorLabels {
|
|
||||||
app = honorLabelsAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
labels: t.unlockedLabels(),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
app = ruleLabelsAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
labels: t.unlockedLabels(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return app
|
|
||||||
}
|
|
||||||
|
|
||||||
// wrapReportingAppender wraps an appender for target status report samples.
|
|
||||||
// It ignores any relabeling rules set for the target.
|
|
||||||
// RLock must not be acquired by the caller.
|
|
||||||
func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender {
|
|
||||||
return ruleLabelsAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
labels: t.Labels(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// URL returns a copy of the target's URL.
|
// URL returns a copy of the target's URL.
|
||||||
func (t *Target) URL() *url.URL {
|
func (t *Target) URL() *url.URL {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
|
|
|
@ -92,82 +92,6 @@ func TestTargetOffset(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetWrapReportingAppender(t *testing.T) {
|
|
||||||
cfg := &config.ScrapeConfig{
|
|
||||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
|
||||||
{}, {}, {},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
|
||||||
target.scrapeConfig = cfg
|
|
||||||
app := &nopAppender{}
|
|
||||||
|
|
||||||
cfg.HonorLabels = false
|
|
||||||
wrapped := target.wrapReportingAppender(app)
|
|
||||||
|
|
||||||
rl, ok := wrapped.(ruleLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
if rl.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.HonorLabels = true
|
|
||||||
wrapped = target.wrapReportingAppender(app)
|
|
||||||
|
|
||||||
hl, ok := wrapped.(ruleLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
if hl.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTargetWrapAppender(t *testing.T) {
|
|
||||||
cfg := &config.ScrapeConfig{
|
|
||||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
|
||||||
{}, {}, {},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
|
||||||
target.scrapeConfig = cfg
|
|
||||||
app := &nopAppender{}
|
|
||||||
|
|
||||||
cfg.HonorLabels = false
|
|
||||||
wrapped := target.wrapAppender(app)
|
|
||||||
|
|
||||||
rl, ok := wrapped.(ruleLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
re, ok := rl.SampleAppender.(relabelAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
|
|
||||||
}
|
|
||||||
if re.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.HonorLabels = true
|
|
||||||
wrapped = target.wrapAppender(app)
|
|
||||||
|
|
||||||
hl, ok := wrapped.(honorLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
re, ok = hl.SampleAppender.(relabelAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
|
|
||||||
}
|
|
||||||
if re.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTargetScrape404(t *testing.T) {
|
func TestTargetScrape404(t *testing.T) {
|
||||||
server := httptest.NewServer(
|
server := httptest.NewServer(
|
||||||
http.HandlerFunc(
|
http.HandlerFunc(
|
||||||
|
|
|
@ -184,7 +184,7 @@ type targetSet struct {
|
||||||
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
|
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
|
||||||
ts := &targetSet{
|
ts := &targetSet{
|
||||||
tgroups: map[string]map[model.Fingerprint]*Target{},
|
tgroups: map[string]map[model.Fingerprint]*Target{},
|
||||||
scrapePool: newScrapePool(app),
|
scrapePool: newScrapePool(cfg, app),
|
||||||
syncCh: make(chan struct{}, 1),
|
syncCh: make(chan struct{}, 1),
|
||||||
config: cfg,
|
config: cfg,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue