retrieval: use separate appender per target

This commit is contained in:
Fabian Reinartz 2016-12-30 21:35:35 +01:00
parent 61bd698143
commit e631a1260d
5 changed files with 53 additions and 30 deletions

View file

@ -96,7 +96,7 @@ func Main() int {
var (
notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender)
targetManager = retrieval.NewTargetManager(localStorage)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
)

View file

@ -16,8 +16,15 @@ package retrieval
import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
type nopAppendable struct{}
func (a nopAppendable) Appender() (storage.Appender, error) {
return nopAppender{}, nil
}
type nopAppender struct{}
func (a nopAppender) Add(l labels.Labels, t int64, v float64) error { return nil }

View file

@ -89,7 +89,7 @@ func init() {
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appender storage.Appender
appendable Appendable
ctx context.Context
@ -105,20 +105,20 @@ type scrapePool struct {
newLoop func(context.Context, scraper, storage.Appender, storage.Appender) loop
}
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.Appender) *scrapePool {
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool {
client, err := NewHTTPClient(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
return &scrapePool{
appender: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newScrapeLoop,
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newScrapeLoop,
}
}
@ -266,7 +266,10 @@ func (sp *scrapePool) sync(targets []*Target) {
// sampleAppender returns an appender for ingested samples from the target.
func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
app := sp.appender
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
@ -292,8 +295,12 @@ func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
// reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
return ruleLabelsAppender{
Appender: sp.appender,
Appender: app,
labels: target.Labels(),
}
}
@ -524,12 +531,16 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam
)
if err := sl.reportAppender.Add(healthMet, ts, health); err != nil {
log.With("error", err).Warn("Scrape health sample discarded")
log.With("err", err).Warn("Scrape health sample discarded")
}
if err := sl.reportAppender.Add(durationMet, ts, duration.Seconds()); err != nil {
log.With("error", err).Warn("Scrape duration sample discarded")
log.With("err", err).Warn("Scrape duration sample discarded")
}
if err := sl.reportAppender.Add(countMet, ts, float64(scrapedSamples)); err != nil {
log.With("error", err).Warn("Scrape sample count sample discarded")
log.With("err", err).Warn("Scrape sample count sample discarded")
}
if err := sl.reportAppender.Commit(); err != nil {
log.With("err", err).Warn("Commiting report samples failed")
}
}

View file

@ -36,12 +36,12 @@ import (
func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppender{}
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(context.Background(), cfg, app)
)
if a, ok := sp.appender.(*nopAppender); !ok || a != app {
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
t.Fatalf("Wrong sample appender")
}
if sp.config != cfg {
@ -157,9 +157,10 @@ func TestScrapePoolReload(t *testing.T) {
return l
}
sp := &scrapePool{
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
appendable: &nopAppendable{},
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
}
// Reloading a scrape pool with a new scrape configuration must stop all scrape
@ -227,7 +228,7 @@ func TestScrapePoolReportAppender(t *testing.T) {
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppender{}
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app)
@ -238,7 +239,7 @@ func TestScrapePoolReportAppender(t *testing.T) {
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if rl.Appender != app {
if _, ok := rl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", rl.Appender)
}
@ -249,7 +250,7 @@ func TestScrapePoolReportAppender(t *testing.T) {
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if hl.Appender != app {
if _, ok := rl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", hl.Appender)
}
}
@ -262,7 +263,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppender{}
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app)
@ -277,7 +278,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
if !ok {
t.Fatalf("Expected relabelAppender but got %T", rl.Appender)
}
if re.Appender != app {
if _, ok := re.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", re.Appender)
}
@ -292,7 +293,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
if !ok {
t.Fatalf("Expected relabelAppender but got %T", hl.Appender)
}
if re.Appender != app {
if _, ok := re.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", re.Appender)
}
}

View file

@ -28,7 +28,7 @@ import (
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
appender storage.Appender
append Appendable
scrapeConfigs []*config.ScrapeConfig
mtx sync.RWMutex
@ -48,10 +48,14 @@ type targetSet struct {
sp *scrapePool
}
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewTargetManager creates a new TargetManager.
func NewTargetManager(app storage.Appender) *TargetManager {
func NewTargetManager(app Appendable) *TargetManager {
return &TargetManager{
appender: app,
append: app,
targetSets: map[string]*targetSet{},
}
}
@ -100,7 +104,7 @@ func (tm *TargetManager) reload() {
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.appender),
sp: newScrapePool(ctx, scfg, tm.append),
}
ts.ts = discovery.NewTargetSet(ts.sp)