mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-14 07:17:52 -08:00
Merge pull request #1429 from prometheus/scraperef7
Next iteration of retrieval refactoring
This commit is contained in:
commit
cf56e33030
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
)
|
)
|
||||||
|
@ -69,106 +70,169 @@ func init() {
|
||||||
// scrapePool manages scrapes for sets of targets.
|
// scrapePool manages scrapes for sets of targets.
|
||||||
type scrapePool struct {
|
type scrapePool struct {
|
||||||
appender storage.SampleAppender
|
appender storage.SampleAppender
|
||||||
|
config *config.ScrapeConfig
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
|
// Targets and loops must always be synchronized to have the same
|
||||||
|
// set of fingerprints.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
tgroups map[string]map[model.Fingerprint]*Target
|
targets map[model.Fingerprint]*Target
|
||||||
|
loops map[model.Fingerprint]loop
|
||||||
|
|
||||||
targets map[model.Fingerprint]loop
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
||||||
|
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScrapePool(app storage.SampleAppender) *scrapePool {
|
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
|
||||||
return &scrapePool{
|
return &scrapePool{
|
||||||
appender: app,
|
appender: app,
|
||||||
tgroups: map[string]map[model.Fingerprint]*Target{},
|
config: cfg,
|
||||||
|
targets: map[model.Fingerprint]*Target{},
|
||||||
|
loops: map[model.Fingerprint]loop{},
|
||||||
|
newLoop: newScrapeLoop,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stop terminates all scrape loops and returns after they all terminated.
|
||||||
func (sp *scrapePool) stop() {
|
func (sp *scrapePool) stop() {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
sp.mtx.RLock()
|
sp.mtx.Lock()
|
||||||
|
defer sp.mtx.Unlock()
|
||||||
|
|
||||||
for _, tgroup := range sp.tgroups {
|
for fp, l := range sp.loops {
|
||||||
for _, t := range tgroup {
|
wg.Add(1)
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go func(t *Target) {
|
go func(l loop) {
|
||||||
t.scrapeLoop.stop()
|
l.stop()
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(t)
|
}(l)
|
||||||
}
|
|
||||||
|
delete(sp.loops, fp)
|
||||||
|
delete(sp.targets, fp)
|
||||||
}
|
}
|
||||||
sp.mtx.RUnlock()
|
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sp *scrapePool) sync(tgroups map[string]map[model.Fingerprint]*Target) {
|
// reload the scrape pool with the given scrape configuration. The target state is preserved
|
||||||
|
// but all scrape loops are restarted with the new scrape configuration.
|
||||||
|
// This method returns after all scrape loops that were stopped have fully terminated.
|
||||||
|
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
||||||
sp.mtx.Lock()
|
sp.mtx.Lock()
|
||||||
|
defer sp.mtx.Unlock()
|
||||||
|
|
||||||
|
sp.config = cfg
|
||||||
|
|
||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
newTgroups = map[string]map[model.Fingerprint]*Target{}
|
interval = time.Duration(sp.config.ScrapeInterval)
|
||||||
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||||
)
|
)
|
||||||
|
|
||||||
for source, targets := range tgroups {
|
for fp, oldLoop := range sp.loops {
|
||||||
var (
|
var (
|
||||||
prevTargets = sp.tgroups[source]
|
t = sp.targets[fp]
|
||||||
newTargets = map[model.Fingerprint]*Target{}
|
newLoop = sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
|
||||||
)
|
)
|
||||||
newTgroups[source] = newTargets
|
wg.Add(1)
|
||||||
|
|
||||||
for fp, tnew := range targets {
|
go func(oldLoop, newLoop loop) {
|
||||||
// If the same target existed before, we let it run and replace
|
oldLoop.stop()
|
||||||
// the new one with it.
|
wg.Done()
|
||||||
if told, ok := prevTargets[fp]; ok {
|
|
||||||
newTargets[fp] = told
|
|
||||||
} else {
|
|
||||||
newTargets[fp] = tnew
|
|
||||||
|
|
||||||
tnew.scrapeLoop = newScrapeLoop(sp.ctx, tnew, tnew.wrapAppender(sp.appender), tnew.wrapReportingAppender(sp.appender))
|
go newLoop.run(interval, timeout, nil)
|
||||||
go tnew.scrapeLoop.run(tnew.interval(), tnew.timeout(), nil)
|
}(oldLoop, newLoop)
|
||||||
}
|
|
||||||
}
|
|
||||||
for fp, told := range prevTargets {
|
|
||||||
// A previous target is no longer in the group.
|
|
||||||
if _, ok := targets[fp]; !ok {
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go func(told *Target) {
|
sp.loops[fp] = newLoop
|
||||||
told.scrapeLoop.stop()
|
}
|
||||||
wg.Done()
|
|
||||||
}(told)
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
||||||
|
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||||
|
// It returns after all stopped scrape loops terminated.
|
||||||
|
func (sp *scrapePool) sync(targets []*Target) {
|
||||||
|
sp.mtx.Lock()
|
||||||
|
defer sp.mtx.Unlock()
|
||||||
|
|
||||||
|
var (
|
||||||
|
fingerprints = map[model.Fingerprint]struct{}{}
|
||||||
|
interval = time.Duration(sp.config.ScrapeInterval)
|
||||||
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, t := range targets {
|
||||||
|
fp := t.fingerprint()
|
||||||
|
fingerprints[fp] = struct{}{}
|
||||||
|
|
||||||
|
if _, ok := sp.targets[fp]; !ok {
|
||||||
|
l := sp.newLoop(sp.ctx, t, sp.sampleAppender(t), sp.reportAppender(t))
|
||||||
|
|
||||||
|
sp.targets[fp] = t
|
||||||
|
sp.loops[fp] = l
|
||||||
|
|
||||||
|
go l.run(interval, timeout, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop scrapers for target groups that disappeared completely.
|
var wg sync.WaitGroup
|
||||||
for source, targets := range sp.tgroups {
|
|
||||||
if _, ok := tgroups[source]; ok {
|
// Stop and remove old targets and scraper loops.
|
||||||
continue
|
for fp := range sp.targets {
|
||||||
}
|
if _, ok := fingerprints[fp]; !ok {
|
||||||
for _, told := range targets {
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
go func(l loop) {
|
||||||
go func(told *Target) {
|
l.stop()
|
||||||
told.scrapeLoop.stop()
|
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(told)
|
}(sp.loops[fp])
|
||||||
|
|
||||||
|
delete(sp.loops, fp)
|
||||||
|
delete(sp.targets, fp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sp.tgroups = newTgroups
|
|
||||||
|
|
||||||
// Wait for all potentially stopped scrapers to terminate.
|
// Wait for all potentially stopped scrapers to terminate.
|
||||||
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
||||||
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
||||||
// be inserting a previous sample set.
|
// be inserting a previous sample set.
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(fabxc): maybe this can be released earlier with subsequent refactoring.
|
// sampleAppender returns an appender for ingested samples from the target.
|
||||||
sp.mtx.Unlock()
|
func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender {
|
||||||
|
app := sp.appender
|
||||||
|
// The relabelAppender has to be inside the label-modifying appenders
|
||||||
|
// so the relabeling rules are applied to the correct label set.
|
||||||
|
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
|
||||||
|
app = relabelAppender{
|
||||||
|
SampleAppender: app,
|
||||||
|
relabelings: mrc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sp.config.HonorLabels {
|
||||||
|
app = honorLabelsAppender{
|
||||||
|
SampleAppender: app,
|
||||||
|
labels: target.Labels(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
app = ruleLabelsAppender{
|
||||||
|
SampleAppender: app,
|
||||||
|
labels: target.Labels(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|
||||||
|
// reportAppender returns an appender for reporting samples for the target.
|
||||||
|
func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
|
||||||
|
return ruleLabelsAppender{
|
||||||
|
SampleAppender: sp.appender,
|
||||||
|
labels: target.Labels(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A scraper retrieves samples and accepts a status report at the end.
|
// A scraper retrieves samples and accepts a status report at the end.
|
||||||
|
@ -178,6 +242,7 @@ type scraper interface {
|
||||||
offset(interval time.Duration) time.Duration
|
offset(interval time.Duration) time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A loop can run and be stopped again. It must not be reused after it was stopped.
|
||||||
type loop interface {
|
type loop interface {
|
||||||
run(interval, timeout time.Duration, errc chan<- error)
|
run(interval, timeout time.Duration, errc chan<- error)
|
||||||
stop()
|
stop()
|
||||||
|
@ -190,12 +255,11 @@ type scrapeLoop struct {
|
||||||
reportAppender storage.SampleAppender
|
reportAppender storage.SampleAppender
|
||||||
|
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
mtx sync.RWMutex
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) *scrapeLoop {
|
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop {
|
||||||
sl := &scrapeLoop{
|
sl := &scrapeLoop{
|
||||||
scraper: sc,
|
scraper: sc,
|
||||||
appender: app,
|
appender: app,
|
||||||
|
@ -264,10 +328,7 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *scrapeLoop) stop() {
|
func (sl *scrapeLoop) stop() {
|
||||||
sl.mtx.RLock()
|
|
||||||
sl.cancel()
|
sl.cancel()
|
||||||
sl.mtx.RUnlock()
|
|
||||||
|
|
||||||
<-sl.done
|
<-sl.done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,15 +14,333 @@
|
||||||
package retrieval
|
package retrieval
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
// "github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestNewScrapePool(t *testing.T) {
|
||||||
|
var (
|
||||||
|
app = &nopAppender{}
|
||||||
|
cfg = &config.ScrapeConfig{}
|
||||||
|
sp = newScrapePool(cfg, app)
|
||||||
|
)
|
||||||
|
|
||||||
|
if a, ok := sp.appender.(*nopAppender); !ok || a != app {
|
||||||
|
t.Fatalf("Wrong sample appender")
|
||||||
|
}
|
||||||
|
if sp.config != cfg {
|
||||||
|
t.Fatalf("Wrong scrape config")
|
||||||
|
}
|
||||||
|
if sp.newLoop == nil {
|
||||||
|
t.Fatalf("newLoop function not initialized")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testLoop struct {
|
||||||
|
startFunc func(interval, timeout time.Duration, errc chan<- error)
|
||||||
|
stopFunc func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
||||||
|
l.startFunc(interval, timeout, errc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *testLoop) stop() {
|
||||||
|
l.stopFunc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScrapePoolStop(t *testing.T) {
|
||||||
|
sp := &scrapePool{
|
||||||
|
targets: map[model.Fingerprint]*Target{},
|
||||||
|
loops: map[model.Fingerprint]loop{},
|
||||||
|
}
|
||||||
|
var mtx sync.Mutex
|
||||||
|
stopped := map[model.Fingerprint]bool{}
|
||||||
|
numTargets := 20
|
||||||
|
|
||||||
|
// Stopping the scrape pool must call stop() on all scrape loops,
|
||||||
|
// clean them and the respective targets up. It must wait until each loop's
|
||||||
|
// stop function returned before returning itself.
|
||||||
|
|
||||||
|
for i := 0; i < numTargets; i++ {
|
||||||
|
t := &Target{
|
||||||
|
labels: model.LabelSet{
|
||||||
|
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
l := &testLoop{}
|
||||||
|
l.stopFunc = func() {
|
||||||
|
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
||||||
|
|
||||||
|
mtx.Lock()
|
||||||
|
stopped[t.fingerprint()] = true
|
||||||
|
mtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
sp.targets[t.fingerprint()] = t
|
||||||
|
sp.loops[t.fingerprint()] = l
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
stopTime := time.Now()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
sp.stop()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("scrapeLoop.stop() did not return as expected")
|
||||||
|
case <-done:
|
||||||
|
// This should have taken at least as long as the last target slept.
|
||||||
|
if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond {
|
||||||
|
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mtx.Lock()
|
||||||
|
if len(stopped) != numTargets {
|
||||||
|
t.Fatalf("Expected 20 stopped loops, got %d", len(stopped))
|
||||||
|
}
|
||||||
|
mtx.Unlock()
|
||||||
|
|
||||||
|
if len(sp.targets) > 0 {
|
||||||
|
t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets))
|
||||||
|
}
|
||||||
|
if len(sp.loops) > 0 {
|
||||||
|
t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScrapePoolReload(t *testing.T) {
|
||||||
|
var mtx sync.Mutex
|
||||||
|
numTargets := 20
|
||||||
|
|
||||||
|
stopped := map[model.Fingerprint]bool{}
|
||||||
|
|
||||||
|
reloadCfg := &config.ScrapeConfig{
|
||||||
|
ScrapeInterval: model.Duration(3 * time.Second),
|
||||||
|
ScrapeTimeout: model.Duration(2 * time.Second),
|
||||||
|
}
|
||||||
|
// On starting to run, new loops created on reload check whether their preceeding
|
||||||
|
// equivalents have been stopped.
|
||||||
|
newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop {
|
||||||
|
l := &testLoop{}
|
||||||
|
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
|
||||||
|
if interval != 3*time.Second {
|
||||||
|
t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval)
|
||||||
|
}
|
||||||
|
if timeout != 2*time.Second {
|
||||||
|
t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout)
|
||||||
|
}
|
||||||
|
mtx.Lock()
|
||||||
|
if !stopped[s.(*Target).fingerprint()] {
|
||||||
|
t.Errorf("Scrape loop for %v not stopped yet", s.(*Target))
|
||||||
|
}
|
||||||
|
mtx.Unlock()
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
sp := &scrapePool{
|
||||||
|
targets: map[model.Fingerprint]*Target{},
|
||||||
|
loops: map[model.Fingerprint]loop{},
|
||||||
|
newLoop: newLoop,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reloading a scrape pool with a new scrape configuration must stop all scrape
|
||||||
|
// loops and start new ones. A new loop must not be started before the preceeding
|
||||||
|
// one terminated.
|
||||||
|
|
||||||
|
for i := 0; i < numTargets; i++ {
|
||||||
|
t := &Target{
|
||||||
|
labels: model.LabelSet{
|
||||||
|
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
l := &testLoop{}
|
||||||
|
l.stopFunc = func() {
|
||||||
|
time.Sleep(time.Duration(i*20) * time.Millisecond)
|
||||||
|
|
||||||
|
mtx.Lock()
|
||||||
|
stopped[t.fingerprint()] = true
|
||||||
|
mtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
sp.targets[t.fingerprint()] = t
|
||||||
|
sp.loops[t.fingerprint()] = l
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
beforeTargets := map[model.Fingerprint]*Target{}
|
||||||
|
for fp, t := range sp.targets {
|
||||||
|
beforeTargets[fp] = t
|
||||||
|
}
|
||||||
|
|
||||||
|
reloadTime := time.Now()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
sp.reload(reloadCfg)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("scrapeLoop.reload() did not return as expected")
|
||||||
|
case <-done:
|
||||||
|
// This should have taken at least as long as the last target slept.
|
||||||
|
if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond {
|
||||||
|
t.Fatalf("scrapeLoop.stop() exited before all targets stopped")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mtx.Lock()
|
||||||
|
if len(stopped) != numTargets {
|
||||||
|
t.Fatalf("Expected 20 stopped loops, got %d", stopped)
|
||||||
|
}
|
||||||
|
mtx.Unlock()
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(sp.targets, beforeTargets) {
|
||||||
|
t.Fatalf("Reloading affected target states unexpectedly")
|
||||||
|
}
|
||||||
|
if len(sp.loops) != numTargets {
|
||||||
|
t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScrapePoolReportAppender(t *testing.T) {
|
||||||
|
cfg := &config.ScrapeConfig{
|
||||||
|
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||||
|
{}, {}, {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||||
|
app := &nopAppender{}
|
||||||
|
|
||||||
|
sp := newScrapePool(cfg, app)
|
||||||
|
|
||||||
|
cfg.HonorLabels = false
|
||||||
|
wrapped := sp.reportAppender(target)
|
||||||
|
|
||||||
|
rl, ok := wrapped.(ruleLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
if rl.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.HonorLabels = true
|
||||||
|
wrapped = sp.reportAppender(target)
|
||||||
|
|
||||||
|
hl, ok := wrapped.(ruleLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
if hl.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScrapePoolSampleAppender(t *testing.T) {
|
||||||
|
cfg := &config.ScrapeConfig{
|
||||||
|
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||||
|
{}, {}, {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||||
|
app := &nopAppender{}
|
||||||
|
|
||||||
|
sp := newScrapePool(cfg, app)
|
||||||
|
|
||||||
|
cfg.HonorLabels = false
|
||||||
|
wrapped := sp.sampleAppender(target)
|
||||||
|
|
||||||
|
rl, ok := wrapped.(ruleLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
re, ok := rl.SampleAppender.(relabelAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
|
||||||
|
}
|
||||||
|
if re.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.HonorLabels = true
|
||||||
|
wrapped = sp.sampleAppender(target)
|
||||||
|
|
||||||
|
hl, ok := wrapped.(honorLabelsAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
|
||||||
|
}
|
||||||
|
re, ok = hl.SampleAppender.(relabelAppender)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
|
||||||
|
}
|
||||||
|
if re.SampleAppender != app {
|
||||||
|
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScrapeLoopStop(t *testing.T) {
|
||||||
|
scraper := &testScraper{}
|
||||||
|
sl := newScrapeLoop(context.Background(), scraper, nil, nil)
|
||||||
|
|
||||||
|
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
|
||||||
|
// loops are syarted asynchronously. Thus it's possible, that a loop is stopped
|
||||||
|
// again before having started properly.
|
||||||
|
// Stopping not-yet-started loops must block until the run method was called and exited.
|
||||||
|
// The run method must exit immediately.
|
||||||
|
|
||||||
|
stopDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
sl.stop()
|
||||||
|
close(stopDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopDone:
|
||||||
|
t.Fatalf("Stopping terminated before run exited successfully")
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
// Running the scrape loop must exit before calling the scraper even once.
|
||||||
|
scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) {
|
||||||
|
t.Fatalf("scraper was called for terminated scrape loop")
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
runDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
sl.run(0, 0, nil)
|
||||||
|
close(runDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runDone:
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("Running terminated scrape loop did not exit")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopDone:
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatalf("Stopping did not terminate after running exited")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestScrapeLoopRun(t *testing.T) {
|
func TestScrapeLoopRun(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
signal = make(chan struct{})
|
signal = make(chan struct{})
|
||||||
|
|
|
@ -121,13 +121,12 @@ type Target struct {
|
||||||
// The status object for the target. It is only set once on initialization.
|
// The status object for the target. It is only set once on initialization.
|
||||||
status *TargetStatus
|
status *TargetStatus
|
||||||
|
|
||||||
scrapeLoop *scrapeLoop
|
scrapeLoop *scrapeLoop
|
||||||
|
scrapeConfig *config.ScrapeConfig
|
||||||
|
|
||||||
// Mutex protects the members below.
|
// Mutex protects the members below.
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
|
||||||
scrapeConfig *config.ScrapeConfig
|
|
||||||
|
|
||||||
// Labels before any processing.
|
// Labels before any processing.
|
||||||
metaLabels model.LabelSet
|
metaLabels model.LabelSet
|
||||||
// Any labels that are added to this target and its metrics.
|
// Any labels that are added to this target and its metrics.
|
||||||
|
@ -230,20 +229,6 @@ func (t *Target) offset(interval time.Duration) time.Duration {
|
||||||
return time.Duration(next)
|
return time.Duration(next)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Target) interval() time.Duration {
|
|
||||||
t.RLock()
|
|
||||||
defer t.RUnlock()
|
|
||||||
|
|
||||||
return time.Duration(t.scrapeConfig.ScrapeInterval)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Target) timeout() time.Duration {
|
|
||||||
t.RLock()
|
|
||||||
defer t.RUnlock()
|
|
||||||
|
|
||||||
return time.Duration(t.scrapeConfig.ScrapeTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Target) scheme() string {
|
func (t *Target) scheme() string {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
defer t.RUnlock()
|
defer t.RUnlock()
|
||||||
|
@ -265,42 +250,6 @@ func (t *Target) path() string {
|
||||||
return string(t.labels[model.MetricsPathLabel])
|
return string(t.labels[model.MetricsPathLabel])
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapAppender wraps a SampleAppender for samples ingested from the target.
|
|
||||||
// RLock must be acquired by the caller.
|
|
||||||
func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender {
|
|
||||||
// The relabelAppender has to be inside the label-modifying appenders
|
|
||||||
// so the relabeling rules are applied to the correct label set.
|
|
||||||
if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 {
|
|
||||||
app = relabelAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
relabelings: mrc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if t.scrapeConfig.HonorLabels {
|
|
||||||
app = honorLabelsAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
labels: t.unlockedLabels(),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
app = ruleLabelsAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
labels: t.unlockedLabels(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return app
|
|
||||||
}
|
|
||||||
|
|
||||||
// wrapReportingAppender wraps an appender for target status report samples.
|
|
||||||
// It ignores any relabeling rules set for the target.
|
|
||||||
// RLock must not be acquired by the caller.
|
|
||||||
func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender {
|
|
||||||
return ruleLabelsAppender{
|
|
||||||
SampleAppender: app,
|
|
||||||
labels: t.Labels(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// URL returns a copy of the target's URL.
|
// URL returns a copy of the target's URL.
|
||||||
func (t *Target) URL() *url.URL {
|
func (t *Target) URL() *url.URL {
|
||||||
t.RLock()
|
t.RLock()
|
||||||
|
|
|
@ -92,82 +92,6 @@ func TestTargetOffset(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetWrapReportingAppender(t *testing.T) {
|
|
||||||
cfg := &config.ScrapeConfig{
|
|
||||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
|
||||||
{}, {}, {},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
|
||||||
target.scrapeConfig = cfg
|
|
||||||
app := &nopAppender{}
|
|
||||||
|
|
||||||
cfg.HonorLabels = false
|
|
||||||
wrapped := target.wrapReportingAppender(app)
|
|
||||||
|
|
||||||
rl, ok := wrapped.(ruleLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
if rl.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.HonorLabels = true
|
|
||||||
wrapped = target.wrapReportingAppender(app)
|
|
||||||
|
|
||||||
hl, ok := wrapped.(ruleLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
if hl.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTargetWrapAppender(t *testing.T) {
|
|
||||||
cfg := &config.ScrapeConfig{
|
|
||||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
|
||||||
{}, {}, {},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
|
||||||
target.scrapeConfig = cfg
|
|
||||||
app := &nopAppender{}
|
|
||||||
|
|
||||||
cfg.HonorLabels = false
|
|
||||||
wrapped := target.wrapAppender(app)
|
|
||||||
|
|
||||||
rl, ok := wrapped.(ruleLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
re, ok := rl.SampleAppender.(relabelAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
|
|
||||||
}
|
|
||||||
if re.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.HonorLabels = true
|
|
||||||
wrapped = target.wrapAppender(app)
|
|
||||||
|
|
||||||
hl, ok := wrapped.(honorLabelsAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
|
|
||||||
}
|
|
||||||
re, ok = hl.SampleAppender.(relabelAppender)
|
|
||||||
if !ok {
|
|
||||||
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
|
|
||||||
}
|
|
||||||
if re.SampleAppender != app {
|
|
||||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTargetScrape404(t *testing.T) {
|
func TestTargetScrape404(t *testing.T) {
|
||||||
server := httptest.NewServer(
|
server := httptest.NewServer(
|
||||||
http.HandlerFunc(
|
http.HandlerFunc(
|
||||||
|
|
|
@ -117,6 +117,8 @@ func (tm *TargetManager) reload() {
|
||||||
ts.runScraping(tm.ctx)
|
ts.runScraping(tm.ctx)
|
||||||
tm.wg.Done()
|
tm.wg.Done()
|
||||||
}(ts)
|
}(ts)
|
||||||
|
} else {
|
||||||
|
ts.reload(scfg)
|
||||||
}
|
}
|
||||||
ts.runProviders(tm.ctx, providersFromConfig(scfg))
|
ts.runProviders(tm.ctx, providersFromConfig(scfg))
|
||||||
}
|
}
|
||||||
|
@ -140,12 +142,14 @@ func (tm *TargetManager) Pools() map[string][]*Target {
|
||||||
|
|
||||||
// TODO(fabxc): this is just a hack to maintain compatibility for now.
|
// TODO(fabxc): this is just a hack to maintain compatibility for now.
|
||||||
for _, ps := range tm.targetSets {
|
for _, ps := range tm.targetSets {
|
||||||
for _, ts := range ps.scrapePool.tgroups {
|
ps.scrapePool.mtx.RLock()
|
||||||
for _, t := range ts {
|
|
||||||
job := string(t.Labels()[model.JobLabel])
|
for _, t := range ps.scrapePool.targets {
|
||||||
pools[job] = append(pools[job], t)
|
job := string(t.Labels()[model.JobLabel])
|
||||||
}
|
pools[job] = append(pools[job], t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ps.scrapePool.mtx.RUnlock()
|
||||||
}
|
}
|
||||||
return pools
|
return pools
|
||||||
}
|
}
|
||||||
|
@ -166,10 +170,12 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// targetSet holds several TargetProviders for which the same scrape configuration
|
// targetSet holds several TargetProviders for which the same scrape configuration
|
||||||
// is used. It runs the target providers and starts and stops scrapers as it
|
// is used. It maintains target groups from all given providers and sync them
|
||||||
// receives target updates.
|
// to a scrape pool.
|
||||||
type targetSet struct {
|
type targetSet struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
|
// Sets of targets by a source string that is unique across target providers.
|
||||||
tgroups map[string]map[model.Fingerprint]*Target
|
tgroups map[string]map[model.Fingerprint]*Target
|
||||||
providers map[string]TargetProvider
|
providers map[string]TargetProvider
|
||||||
|
|
||||||
|
@ -184,7 +190,7 @@ type targetSet struct {
|
||||||
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
|
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
|
||||||
ts := &targetSet{
|
ts := &targetSet{
|
||||||
tgroups: map[string]map[model.Fingerprint]*Target{},
|
tgroups: map[string]map[model.Fingerprint]*Target{},
|
||||||
scrapePool: newScrapePool(app),
|
scrapePool: newScrapePool(cfg, app),
|
||||||
syncCh: make(chan struct{}, 1),
|
syncCh: make(chan struct{}, 1),
|
||||||
config: cfg,
|
config: cfg,
|
||||||
}
|
}
|
||||||
|
@ -203,6 +209,14 @@ func (ts *targetSet) cancel() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ts *targetSet) reload(cfg *config.ScrapeConfig) {
|
||||||
|
ts.mtx.Lock()
|
||||||
|
ts.config = cfg
|
||||||
|
ts.mtx.Unlock()
|
||||||
|
|
||||||
|
ts.scrapePool.reload(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
func (ts *targetSet) runScraping(ctx context.Context) {
|
func (ts *targetSet) runScraping(ctx context.Context) {
|
||||||
ctx, ts.cancelScraping = context.WithCancel(ctx)
|
ctx, ts.cancelScraping = context.WithCancel(ctx)
|
||||||
|
|
||||||
|
@ -221,7 +235,9 @@ Loop:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break Loop
|
break Loop
|
||||||
case <-ts.syncCh:
|
case <-ts.syncCh:
|
||||||
|
ts.mtx.RLock()
|
||||||
ts.sync()
|
ts.sync()
|
||||||
|
ts.mtx.RUnlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,9 +247,13 @@ Loop:
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *targetSet) sync() {
|
func (ts *targetSet) sync() {
|
||||||
// TODO(fabxc): temporary simple version. For a deduplicating scrape pool we will
|
targets := []*Target{}
|
||||||
// submit a list of all targets.
|
for _, tgroup := range ts.tgroups {
|
||||||
ts.scrapePool.sync(ts.tgroups)
|
for _, t := range tgroup {
|
||||||
|
targets = append(targets, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ts.scrapePool.sync(targets)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
|
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
|
||||||
|
@ -298,8 +318,9 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ
|
||||||
go prov.Run(ctx, updates)
|
go prov.Run(ctx, updates)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We wait for a full initial set of target groups before releasing the mutex
|
||||||
|
// to ensure the initial sync is complete and there are no races with subsequent updates.
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
ts.sync()
|
ts.sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue