diff --git a/config/config.go b/config/config.go index 75b946620..697638236 100644 --- a/config/config.go +++ b/config/config.go @@ -16,10 +16,14 @@ package config import ( "code.google.com/p/goprotobuf/proto" "fmt" - pb "github.com/prometheus/prometheus/config/generated" - "github.com/prometheus/prometheus/utility" "regexp" "time" + + clientmodel "github.com/prometheus/client_golang/model" + + pb "github.com/prometheus/prometheus/config/generated" + + "github.com/prometheus/prometheus/utility" ) var jobNameRE = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_-]*$") @@ -98,6 +102,17 @@ func (c Config) GetJobByName(name string) *JobConfig { return nil } +// Return the global labels as a LabelSet. +func (c Config) GlobalLabels() clientmodel.LabelSet { + labels := clientmodel.LabelSet{} + if c.Global.Labels != nil { + for _, label := range c.Global.Labels.Label { + labels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue()) + } + } + return labels +} + // Jobs returns all the jobs in a Config object. func (c Config) Jobs() (jobs []JobConfig) { for _, job := range c.Job { diff --git a/main.go b/main.go index 858a24ae7..7b5c16b48 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,8 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/extraction" + clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notification" "github.com/prometheus/prometheus/retrieval" @@ -181,6 +183,12 @@ func main() { } unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity) + ingester := &retrieval.MergeLabelsIngester{ + Labels: conf.GlobalLabels(), + CollisionPrefix: clientmodel.ExporterLabelPrefix, + + Ingester: retrieval.ChannelIngester(unwrittenSamples), + } curationState := make(chan metric.CurationState, 1) // Coprime numbers, fool! headCompactionTimer := time.NewTicker(*headCompactInterval) @@ -189,7 +197,7 @@ func main() { deletionTimer := time.NewTicker(*deleteInterval) // Queue depth will need to be exposed - targetManager := retrieval.NewTargetManager(unwrittenSamples, *concurrentRetrievalAllowance) + targetManager := retrieval.NewTargetManager(ingester, *concurrentRetrievalAllowance) targetManager.AddTargetsFromConfig(conf) notifications := make(chan notification.NotificationReqs, *notificationQueueCapacity) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 61f829ea8..3634c2d45 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -15,6 +15,8 @@ package retrieval import ( "time" + + "github.com/prometheus/client_golang/extraction" ) type literalScheduler time.Time @@ -25,3 +27,9 @@ func (s literalScheduler) ScheduledFor() time.Time { func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) { } + +type nopIngester struct{} + +func (i nopIngester) Ingest(*extraction.Result) error { + return nil +} diff --git a/retrieval/ingester.go b/retrieval/ingester.go new file mode 100644 index 000000000..667a58328 --- /dev/null +++ b/retrieval/ingester.go @@ -0,0 +1,46 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "github.com/prometheus/client_golang/extraction" + + clientmodel "github.com/prometheus/client_golang/model" +) + +// MergeLabelsIngester merges a labelset ontop of a given extraction result and +// passes the result on to another ingester. Label collisions are avoided by +// appending a label prefix to any newly merged colliding labels. +type MergeLabelsIngester struct { + Labels clientmodel.LabelSet + CollisionPrefix clientmodel.LabelName + + Ingester extraction.Ingester +} + +func (i *MergeLabelsIngester) Ingest(r *extraction.Result) error { + for _, s := range r.Samples { + s.Metric.MergeFromLabelSet(i.Labels, i.CollisionPrefix) + } + + return i.Ingester.Ingest(r) +} + +// ChannelIngester feeds results into a channel without modifying them. +type ChannelIngester chan<- *extraction.Result + +func (i ChannelIngester) Ingest(r *extraction.Result) error { + i <- r + return nil +} diff --git a/retrieval/target.go b/retrieval/target.go index db60bf842..72b4ad68a 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -91,7 +91,7 @@ type Target interface { // alluded to in the scheduledFor function, to use this as it wants to. The // current use case is to create a common batching time for scraping multiple // Targets in the future through the TargetPool. - Scrape(earliest time.Time, results chan<- *extraction.Result) error + Scrape(earliest time.Time, ingester extraction.Ingester) error // Fulfill the healthReporter interface. State() TargetState // Report the soonest time at which this Target may be scheduled for @@ -156,7 +156,7 @@ func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.La return target } -func (t *target) recordScrapeHealth(results chan<- *extraction.Result, timestamp time.Time, healthy bool) { +func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp time.Time, healthy bool) { metric := clientmodel.Metric{} for label, value := range t.baseLabels { metric[label] = value @@ -175,21 +175,21 @@ func (t *target) recordScrapeHealth(results chan<- *extraction.Result, timestamp Value: healthValue, } - results <- &extraction.Result{ + ingester.Ingest(&extraction.Result{ Err: nil, Samples: clientmodel.Samples{sample}, - } + }) } -func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) error { +func (t *target) Scrape(earliest time.Time, ingester extraction.Ingester) error { now := time.Now() futureState := t.state - err := t.scrape(now, results) + err := t.scrape(now, ingester) if err != nil { - t.recordScrapeHealth(results, now, false) + t.recordScrapeHealth(ingester, now, false) futureState = UNREACHABLE } else { - t.recordScrapeHealth(results, now, true) + t.recordScrapeHealth(ingester, now, true) futureState = ALIVE } @@ -202,29 +202,7 @@ func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) e const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1` -type channelIngester chan<- *extraction.Result - -func (i channelIngester) Ingest(r *extraction.Result) error { - i <- r - - return nil -} - -type extendLabelsIngester struct { - baseLabels clientmodel.LabelSet - - i extraction.Ingester -} - -func (i *extendLabelsIngester) Ingest(r *extraction.Result) error { - for _, s := range r.Samples { - s.Metric.MergeFromLabelSet(i.baseLabels, clientmodel.ExporterLabelPrefix) - } - - return i.i.Ingest(r) -} - -func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) (err error) { +func (t *target) scrape(timestamp time.Time, ingester extraction.Ingester) (err error) { defer func(start time.Time) { ms := float64(time.Since(start)) / float64(time.Millisecond) labels := map[string]string{address: t.Address(), outcome: success} @@ -268,15 +246,16 @@ func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) return err } - ingester := &extendLabelsIngester{ - baseLabels: baseLabels, + i := &MergeLabelsIngester{ + Labels: baseLabels, + CollisionPrefix: clientmodel.ExporterLabelPrefix, - i: channelIngester(results), + Ingester: ingester, } processOptions := &extraction.ProcessOptions{ Timestamp: timestamp, } - return processor.ProcessSingle(buf, ingester, processOptions) + return processor.ProcessSingle(buf, i, processOptions) } func (t *target) State() TargetState { diff --git a/retrieval/target_test.go b/retrieval/target_test.go index a6cab0206..e2aca56a5 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -26,6 +26,15 @@ import ( "github.com/prometheus/prometheus/utility" ) +type collectResultIngester struct { + result *extraction.Result +} + +func (i *collectResultIngester) Ingest(r *extraction.Result) error { + i.result = r + return nil +} + func TestTargetScrapeUpdatesState(t *testing.T) { testTarget := target{ scheduler: literalScheduler{}, @@ -33,7 +42,7 @@ func TestTargetScrapeUpdatesState(t *testing.T) { address: "bad schema", httpClient: utility.NewDeadlineClient(0), } - testTarget.Scrape(time.Time{}, make(chan *extraction.Result, 2)) + testTarget.Scrape(time.Time{}, nopIngester{}) if testTarget.state != UNREACHABLE { t.Errorf("Expected target state %v, actual: %v", UNREACHABLE, testTarget.state) } @@ -48,10 +57,10 @@ func TestTargetRecordScrapeHealth(t *testing.T) { } now := time.Now() - results := make(chan *extraction.Result) - go testTarget.recordScrapeHealth(results, now, true) + ingester := &collectResultIngester{} + testTarget.recordScrapeHealth(ingester, now, true) - result := <-results + result := ingester.result if len(result.Samples) != 1 { t.Fatalf("Expected one sample, got %d", len(result.Samples)) @@ -88,11 +97,11 @@ func TestTargetScrapeTimeout(t *testing.T) { defer server.Close() testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) - results := make(chan *extraction.Result, 1024) + ingester := nopIngester{} // scrape once without timeout signal <- true - if err := testTarget.Scrape(time.Now(), results); err != nil { + if err := testTarget.Scrape(time.Now(), ingester); err != nil { t.Fatal(err) } @@ -101,12 +110,12 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again signal <- true - if err := testTarget.Scrape(time.Now(), results); err != nil { + if err := testTarget.Scrape(time.Now(), ingester); err != nil { t.Fatal(err) } // now timeout - if err := testTarget.Scrape(time.Now(), results); err == nil { + if err := testTarget.Scrape(time.Now(), ingester); err == nil { t.Fatal("expected scrape to timeout") } else { signal <- true // let handler continue @@ -114,7 +123,7 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again without timeout signal <- true - if err := testTarget.Scrape(time.Now(), results); err != nil { + if err := testTarget.Scrape(time.Now(), ingester); err != nil { t.Fatal(err) } } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index eae4dc47b..675f02e4a 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -37,13 +37,13 @@ type TargetManager interface { type targetManager struct { requestAllowance chan bool poolsByJob map[string]*TargetPool - results chan<- *extraction.Result + ingester extraction.Ingester } -func NewTargetManager(results chan<- *extraction.Result, requestAllowance int) TargetManager { +func NewTargetManager(ingester extraction.Ingester, requestAllowance int) TargetManager { return &targetManager{ requestAllowance: make(chan bool, requestAllowance), - results: results, + ingester: ingester, poolsByJob: make(map[string]*TargetPool), } } @@ -71,7 +71,7 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { interval := job.ScrapeInterval() m.poolsByJob[job.GetName()] = targetPool // BUG(all): Investigate whether this auto-goroutine creation is desired. - go targetPool.Run(m.results, interval) + go targetPool.Run(m.ingester, interval) } return targetPool diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 08115a3f1..501626194 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -56,7 +56,7 @@ func (t fakeTarget) Interval() time.Duration { return t.interval } -func (t *fakeTarget) Scrape(e time.Time, r chan<- *extraction.Result) error { +func (t *fakeTarget) Scrape(e time.Time, i extraction.Ingester) error { t.scrapeCount++ return nil @@ -78,8 +78,7 @@ func (t *fakeTarget) Merge(newTarget Target) {} func (t *fakeTarget) EstimatedTimeToExecute() time.Duration { return 0 } func testTargetManager(t test.Tester) { - results := make(chan *extraction.Result, 5) - targetManager := NewTargetManager(results, 3) + targetManager := NewTargetManager(nopIngester{}, 3) testJob1 := config.JobConfig{ JobConfig: pb.JobConfig{ Name: proto.String("test_job1"), diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 5480b3d97..c1ba2e5f8 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -50,14 +50,14 @@ func NewTargetPool(m TargetManager, p TargetProvider) *TargetPool { } } -func (p *TargetPool) Run(results chan<- *extraction.Result, interval time.Duration) { +func (p *TargetPool) Run(ingester extraction.Ingester, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: - p.runIteration(results, interval) + p.runIteration(ingester, interval) case newTarget := <-p.addTargetQueue: p.addTarget(newTarget) case newTargets := <-p.replaceTargetsQueue: @@ -116,14 +116,14 @@ func (p *TargetPool) replaceTargets(newTargets []Target) { p.targets = newTargets } -func (p *TargetPool) runSingle(earliest time.Time, results chan<- *extraction.Result, t Target) { +func (p *TargetPool) runSingle(earliest time.Time, ingester extraction.Ingester, t Target) { p.manager.acquire() defer p.manager.release() - t.Scrape(earliest, results) + t.Scrape(earliest, ingester) } -func (p *TargetPool) runIteration(results chan<- *extraction.Result, interval time.Duration) { +func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Duration) { if p.targetProvider != nil { targets, err := p.targetProvider.Targets() if err != nil { @@ -155,7 +155,7 @@ func (p *TargetPool) runIteration(results chan<- *extraction.Result, interval ti wait.Add(1) go func(t Target) { - p.runSingle(now, results, t) + p.runSingle(now, ingester, t) wait.Done() }(target) } diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index 982ffeeb8..54897721f 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -18,8 +18,6 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/extraction" - "github.com/prometheus/prometheus/utility/test" ) @@ -150,7 +148,7 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) { done := make(chan bool) go func() { - pool.runIteration(make(chan *extraction.Result), time.Duration(0)) + pool.runIteration(nopIngester{}, time.Duration(0)) done <- true }()