Add global label support via Ingesters.

This commit is contained in:
Julius Volz 2013-08-13 16:14:18 +02:00
parent 417e6ffa0c
commit d69b85e6c9
10 changed files with 125 additions and 63 deletions

View file

@ -16,10 +16,14 @@ package config
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
pb "github.com/prometheus/prometheus/config/generated"
"github.com/prometheus/prometheus/utility"
"regexp"
"time"
clientmodel "github.com/prometheus/client_golang/model"
pb "github.com/prometheus/prometheus/config/generated"
"github.com/prometheus/prometheus/utility"
)
var jobNameRE = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_-]*$")
@ -98,6 +102,17 @@ func (c Config) GetJobByName(name string) *JobConfig {
return nil
}
// Return the global labels as a LabelSet.
func (c Config) GlobalLabels() clientmodel.LabelSet {
labels := clientmodel.LabelSet{}
if c.Global.Labels != nil {
for _, label := range c.Global.Labels.Label {
labels[clientmodel.LabelName(label.GetName())] = clientmodel.LabelValue(label.GetValue())
}
}
return labels
}
// Jobs returns all the jobs in a Config object.
func (c Config) Jobs() (jobs []JobConfig) {
for _, job := range c.Job {

10
main.go
View file

@ -23,6 +23,8 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/extraction"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notification"
"github.com/prometheus/prometheus/retrieval"
@ -181,6 +183,12 @@ func main() {
}
unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity)
ingester := &retrieval.MergeLabelsIngester{
Labels: conf.GlobalLabels(),
CollisionPrefix: clientmodel.ExporterLabelPrefix,
Ingester: retrieval.ChannelIngester(unwrittenSamples),
}
curationState := make(chan metric.CurationState, 1)
// Coprime numbers, fool!
headCompactionTimer := time.NewTicker(*headCompactInterval)
@ -189,7 +197,7 @@ func main() {
deletionTimer := time.NewTicker(*deleteInterval)
// Queue depth will need to be exposed
targetManager := retrieval.NewTargetManager(unwrittenSamples, *concurrentRetrievalAllowance)
targetManager := retrieval.NewTargetManager(ingester, *concurrentRetrievalAllowance)
targetManager.AddTargetsFromConfig(conf)
notifications := make(chan notification.NotificationReqs, *notificationQueueCapacity)

View file

@ -15,6 +15,8 @@ package retrieval
import (
"time"
"github.com/prometheus/client_golang/extraction"
)
type literalScheduler time.Time
@ -25,3 +27,9 @@ func (s literalScheduler) ScheduledFor() time.Time {
func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) {
}
type nopIngester struct{}
func (i nopIngester) Ingest(*extraction.Result) error {
return nil
}

46
retrieval/ingester.go Normal file
View file

@ -0,0 +1,46 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"github.com/prometheus/client_golang/extraction"
clientmodel "github.com/prometheus/client_golang/model"
)
// MergeLabelsIngester merges a labelset ontop of a given extraction result and
// passes the result on to another ingester. Label collisions are avoided by
// appending a label prefix to any newly merged colliding labels.
type MergeLabelsIngester struct {
Labels clientmodel.LabelSet
CollisionPrefix clientmodel.LabelName
Ingester extraction.Ingester
}
func (i *MergeLabelsIngester) Ingest(r *extraction.Result) error {
for _, s := range r.Samples {
s.Metric.MergeFromLabelSet(i.Labels, i.CollisionPrefix)
}
return i.Ingester.Ingest(r)
}
// ChannelIngester feeds results into a channel without modifying them.
type ChannelIngester chan<- *extraction.Result
func (i ChannelIngester) Ingest(r *extraction.Result) error {
i <- r
return nil
}

View file

@ -91,7 +91,7 @@ type Target interface {
// alluded to in the scheduledFor function, to use this as it wants to. The
// current use case is to create a common batching time for scraping multiple
// Targets in the future through the TargetPool.
Scrape(earliest time.Time, results chan<- *extraction.Result) error
Scrape(earliest time.Time, ingester extraction.Ingester) error
// Fulfill the healthReporter interface.
State() TargetState
// Report the soonest time at which this Target may be scheduled for
@ -156,7 +156,7 @@ func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.La
return target
}
func (t *target) recordScrapeHealth(results chan<- *extraction.Result, timestamp time.Time, healthy bool) {
func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp time.Time, healthy bool) {
metric := clientmodel.Metric{}
for label, value := range t.baseLabels {
metric[label] = value
@ -175,21 +175,21 @@ func (t *target) recordScrapeHealth(results chan<- *extraction.Result, timestamp
Value: healthValue,
}
results <- &extraction.Result{
ingester.Ingest(&extraction.Result{
Err: nil,
Samples: clientmodel.Samples{sample},
}
})
}
func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) error {
func (t *target) Scrape(earliest time.Time, ingester extraction.Ingester) error {
now := time.Now()
futureState := t.state
err := t.scrape(now, results)
err := t.scrape(now, ingester)
if err != nil {
t.recordScrapeHealth(results, now, false)
t.recordScrapeHealth(ingester, now, false)
futureState = UNREACHABLE
} else {
t.recordScrapeHealth(results, now, true)
t.recordScrapeHealth(ingester, now, true)
futureState = ALIVE
}
@ -202,29 +202,7 @@ func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) e
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1`
type channelIngester chan<- *extraction.Result
func (i channelIngester) Ingest(r *extraction.Result) error {
i <- r
return nil
}
type extendLabelsIngester struct {
baseLabels clientmodel.LabelSet
i extraction.Ingester
}
func (i *extendLabelsIngester) Ingest(r *extraction.Result) error {
for _, s := range r.Samples {
s.Metric.MergeFromLabelSet(i.baseLabels, clientmodel.ExporterLabelPrefix)
}
return i.i.Ingest(r)
}
func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) (err error) {
func (t *target) scrape(timestamp time.Time, ingester extraction.Ingester) (err error) {
defer func(start time.Time) {
ms := float64(time.Since(start)) / float64(time.Millisecond)
labels := map[string]string{address: t.Address(), outcome: success}
@ -268,15 +246,16 @@ func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result)
return err
}
ingester := &extendLabelsIngester{
baseLabels: baseLabels,
i := &MergeLabelsIngester{
Labels: baseLabels,
CollisionPrefix: clientmodel.ExporterLabelPrefix,
i: channelIngester(results),
Ingester: ingester,
}
processOptions := &extraction.ProcessOptions{
Timestamp: timestamp,
}
return processor.ProcessSingle(buf, ingester, processOptions)
return processor.ProcessSingle(buf, i, processOptions)
}
func (t *target) State() TargetState {

View file

@ -26,6 +26,15 @@ import (
"github.com/prometheus/prometheus/utility"
)
type collectResultIngester struct {
result *extraction.Result
}
func (i *collectResultIngester) Ingest(r *extraction.Result) error {
i.result = r
return nil
}
func TestTargetScrapeUpdatesState(t *testing.T) {
testTarget := target{
scheduler: literalScheduler{},
@ -33,7 +42,7 @@ func TestTargetScrapeUpdatesState(t *testing.T) {
address: "bad schema",
httpClient: utility.NewDeadlineClient(0),
}
testTarget.Scrape(time.Time{}, make(chan *extraction.Result, 2))
testTarget.Scrape(time.Time{}, nopIngester{})
if testTarget.state != UNREACHABLE {
t.Errorf("Expected target state %v, actual: %v", UNREACHABLE, testTarget.state)
}
@ -48,10 +57,10 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
}
now := time.Now()
results := make(chan *extraction.Result)
go testTarget.recordScrapeHealth(results, now, true)
ingester := &collectResultIngester{}
testTarget.recordScrapeHealth(ingester, now, true)
result := <-results
result := ingester.result
if len(result.Samples) != 1 {
t.Fatalf("Expected one sample, got %d", len(result.Samples))
@ -88,11 +97,11 @@ func TestTargetScrapeTimeout(t *testing.T) {
defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{})
results := make(chan *extraction.Result, 1024)
ingester := nopIngester{}
// scrape once without timeout
signal <- true
if err := testTarget.Scrape(time.Now(), results); err != nil {
if err := testTarget.Scrape(time.Now(), ingester); err != nil {
t.Fatal(err)
}
@ -101,12 +110,12 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again
signal <- true
if err := testTarget.Scrape(time.Now(), results); err != nil {
if err := testTarget.Scrape(time.Now(), ingester); err != nil {
t.Fatal(err)
}
// now timeout
if err := testTarget.Scrape(time.Now(), results); err == nil {
if err := testTarget.Scrape(time.Now(), ingester); err == nil {
t.Fatal("expected scrape to timeout")
} else {
signal <- true // let handler continue
@ -114,7 +123,7 @@ func TestTargetScrapeTimeout(t *testing.T) {
// now scrape again without timeout
signal <- true
if err := testTarget.Scrape(time.Now(), results); err != nil {
if err := testTarget.Scrape(time.Now(), ingester); err != nil {
t.Fatal(err)
}
}

View file

@ -37,13 +37,13 @@ type TargetManager interface {
type targetManager struct {
requestAllowance chan bool
poolsByJob map[string]*TargetPool
results chan<- *extraction.Result
ingester extraction.Ingester
}
func NewTargetManager(results chan<- *extraction.Result, requestAllowance int) TargetManager {
func NewTargetManager(ingester extraction.Ingester, requestAllowance int) TargetManager {
return &targetManager{
requestAllowance: make(chan bool, requestAllowance),
results: results,
ingester: ingester,
poolsByJob: make(map[string]*TargetPool),
}
}
@ -71,7 +71,7 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
interval := job.ScrapeInterval()
m.poolsByJob[job.GetName()] = targetPool
// BUG(all): Investigate whether this auto-goroutine creation is desired.
go targetPool.Run(m.results, interval)
go targetPool.Run(m.ingester, interval)
}
return targetPool

View file

@ -56,7 +56,7 @@ func (t fakeTarget) Interval() time.Duration {
return t.interval
}
func (t *fakeTarget) Scrape(e time.Time, r chan<- *extraction.Result) error {
func (t *fakeTarget) Scrape(e time.Time, i extraction.Ingester) error {
t.scrapeCount++
return nil
@ -78,8 +78,7 @@ func (t *fakeTarget) Merge(newTarget Target) {}
func (t *fakeTarget) EstimatedTimeToExecute() time.Duration { return 0 }
func testTargetManager(t test.Tester) {
results := make(chan *extraction.Result, 5)
targetManager := NewTargetManager(results, 3)
targetManager := NewTargetManager(nopIngester{}, 3)
testJob1 := config.JobConfig{
JobConfig: pb.JobConfig{
Name: proto.String("test_job1"),

View file

@ -50,14 +50,14 @@ func NewTargetPool(m TargetManager, p TargetProvider) *TargetPool {
}
}
func (p *TargetPool) Run(results chan<- *extraction.Result, interval time.Duration) {
func (p *TargetPool) Run(ingester extraction.Ingester, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.runIteration(results, interval)
p.runIteration(ingester, interval)
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
@ -116,14 +116,14 @@ func (p *TargetPool) replaceTargets(newTargets []Target) {
p.targets = newTargets
}
func (p *TargetPool) runSingle(earliest time.Time, results chan<- *extraction.Result, t Target) {
func (p *TargetPool) runSingle(earliest time.Time, ingester extraction.Ingester, t Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(earliest, results)
t.Scrape(earliest, ingester)
}
func (p *TargetPool) runIteration(results chan<- *extraction.Result, interval time.Duration) {
func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Duration) {
if p.targetProvider != nil {
targets, err := p.targetProvider.Targets()
if err != nil {
@ -155,7 +155,7 @@ func (p *TargetPool) runIteration(results chan<- *extraction.Result, interval ti
wait.Add(1)
go func(t Target) {
p.runSingle(now, results, t)
p.runSingle(now, ingester, t)
wait.Done()
}(target)
}

View file

@ -18,8 +18,6 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/extraction"
"github.com/prometheus/prometheus/utility/test"
)
@ -150,7 +148,7 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) {
done := make(chan bool)
go func() {
pool.runIteration(make(chan *extraction.Result), time.Duration(0))
pool.runIteration(nopIngester{}, time.Duration(0))
done <- true
}()