*: migrate ingestion to new batch Appender

This commit is contained in:
Fabian Reinartz 2016-12-29 09:27:30 +01:00
parent 86cb0f30fd
commit f8fc1f5bb2
18 changed files with 386 additions and 327 deletions

View file

@ -23,7 +23,6 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/fabxc/tsdb"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/version" "github.com/prometheus/common/version"
@ -34,9 +33,7 @@ import (
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
) )
@ -77,19 +74,25 @@ func Main() int {
log.Infoln("Build context", version.BuildContext()) log.Infoln("Build context", version.BuildContext())
var ( var (
sampleAppender = storage.Fanout{} // sampleAppender = storage.Fanout{}
reloadables []Reloadable reloadables []Reloadable
) )
_, err := tsdb.Open(cfg.localStoragePath, nil, nil) localStorage, err := tsdb.Open(cfg.localStoragePath)
if err != nil { if err != nil {
log.Errorf("Opening storage failed: %s", err) log.Errorf("Opening storage failed: %s", err)
return 1
} }
var localStorage local.Storage
reloadableRemoteStorage := remote.New() sampleAppender, err := localStorage.Appender()
sampleAppender = append(sampleAppender, reloadableRemoteStorage) if err != nil {
reloadables = append(reloadables, reloadableRemoteStorage) log.Errorf("Creating sample appender failed: %s", err)
return 1
}
// reloadableRemoteStorage := remote.New()
// sampleAppender = append(sampleAppender, reloadableRemoteStorage)
// reloadables = append(reloadables, reloadableRemoteStorage)
var ( var (
notifier = notifier.New(&cfg.notifier) notifier = notifier.New(&cfg.notifier)
@ -162,18 +165,13 @@ func Main() int {
}() }()
// Start all components. The order is NOT arbitrary. // Start all components. The order is NOT arbitrary.
if err := localStorage.Start(); err != nil {
log.Errorln("Error opening memory series storage:", err)
return 1
}
defer func() { defer func() {
if err := localStorage.Stop(); err != nil { if err := localStorage.Close(); err != nil {
log.Errorln("Error stopping storage:", err) log.Errorln("Error stopping storage:", err)
} }
}() }()
defer reloadableRemoteStorage.Stop() // defer reloadableRemoteStorage.Stop()
// The storage has to be fully initialized before registering. // The storage has to be fully initialized before registering.
if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok { if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok {

View file

@ -2,6 +2,7 @@ package labels
import ( import (
"bytes" "bytes"
"encoding/json"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -48,6 +49,10 @@ func (ls Labels) String() string {
return b.String() return b.String()
} }
func (ls Labels) MarshalJSON() ([]byte, error) {
return json.Marshal(ls.Map())
}
// Hash returns a hash value for the label set. // Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 { func (ls Labels) Hash() uint64 {
b := make([]byte, 0, 1024) b := make([]byte, 0, 1024)

View file

@ -336,6 +336,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
Seriess[h] = ss Seriess[h] = ss
} }
ss.Points = append(ss.Points, sample.Point) ss.Points = append(ss.Points, sample.Point)
Seriess[h] = ss
} }
default: default:
panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
@ -358,15 +359,13 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return nil, err return nil, err
} }
// Turn Matrix type with protected metric into model.Matrix.
resMatrix := mat
// TODO(fabxc): order ensured by storage? // TODO(fabxc): order ensured by storage?
// sortTimer := query.stats.GetTimer(stats.ResultSortTime).Start() // TODO(fabxc): where to ensure metric labels are a copy from the storage internals.
// sort.Sort(resMatrix) sortTimer := query.stats.GetTimer(stats.ResultSortTime).Start()
// sortTimer.Stop() sort.Sort(mat)
sortTimer.Stop()
return resMatrix, nil return mat, nil
} }
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Querier, error) { func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Querier, error) {

View file

@ -377,6 +377,11 @@ func (ev *evalCmd) compareResult(result Value) error {
return fmt.Errorf("received instant result on range evaluation") return fmt.Errorf("received instant result on range evaluation")
} }
fmt.Println("vector result", len(val), ev.expr)
for _, ss := range val {
fmt.Println(" ", ss.Metric, ss.Point)
}
seen := map[uint64]bool{} seen := map[uint64]bool{}
for pos, v := range val { for pos, v := range val {
fp := v.Metric.Hash() fp := v.Metric.Hash()

View file

@ -1,7 +1,9 @@
package promql package promql
import ( import (
"encoding/json"
"fmt" "fmt"
"strconv"
"strings" "strings"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
@ -40,6 +42,10 @@ func (s String) String() string {
return s.V return s.V
} }
func (s String) MarshalJSON() ([]byte, error) {
return json.Marshal([...]interface{}{float64(s.T) / 1000, s.V})
}
// Scalar is a data point that's explicitly not associated with a metric. // Scalar is a data point that's explicitly not associated with a metric.
type Scalar struct { type Scalar struct {
T int64 T int64
@ -50,10 +56,15 @@ func (s Scalar) String() string {
return fmt.Sprintf("scalar: %v @[%v]", s.V, s.T) return fmt.Sprintf("scalar: %v @[%v]", s.V, s.T)
} }
func (s Scalar) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(s.V, 'f', -1, 64)
return json.Marshal([...]interface{}{float64(s.T) / 1000, v})
}
// Series is a stream of data points belonging to a metric. // Series is a stream of data points belonging to a metric.
type Series struct { type Series struct {
Metric labels.Labels Metric labels.Labels `json:"metric"`
Points []Point Points []Point `json:"values"`
} }
func (s Series) String() string { func (s Series) String() string {
@ -74,6 +85,12 @@ func (p Point) String() string {
return fmt.Sprintf("%f @[%d]", p.V, p.T) return fmt.Sprintf("%f @[%d]", p.V, p.T)
} }
// MarshalJSON implements json.Marshaler.
func (p Point) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(p.V, 'f', -1, 64)
return json.Marshal([...]interface{}{float64(p.T) / 1000, v})
}
// Sample is a single sample belonging to a metric. // Sample is a single sample belonging to a metric.
type Sample struct { type Sample struct {
Point Point
@ -85,6 +102,17 @@ func (s Sample) String() string {
return fmt.Sprintf("%s => %s", s.Metric, s.Point) return fmt.Sprintf("%s => %s", s.Metric, s.Point)
} }
func (s Sample) MarshalJSON() ([]byte, error) {
v := struct {
M labels.Labels `json:"metric"`
V Point `json:"value"`
}{
M: s.Metric,
V: s.Point,
}
return json.Marshal(v)
}
// Vector is basically only an alias for model.Samples, but the // Vector is basically only an alias for model.Samples, but the
// contract is that in a Vector, all Samples have the same timestamp. // contract is that in a Vector, all Samples have the same timestamp.
type Vector []Sample type Vector []Sample
@ -112,6 +140,10 @@ func (m Matrix) String() string {
return strings.Join(strs, "\n") return strings.Join(strs, "\n")
} }
func (m Matrix) Len() int { return len(m) }
func (m Matrix) Less(i, j int) bool { return labels.Compare(m[i].Metric, m[j].Metric) < 0 }
func (m Matrix) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
// Result holds the resulting value of an execution or an error // Result holds the resulting value of an execution or an error
// if any occurred. // if any occurred.
type Result struct { type Result struct {

View file

@ -14,38 +14,36 @@
package retrieval package retrieval
import ( import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
) )
type nopAppender struct{} type nopAppender struct{}
func (a nopAppender) Append(*model.Sample) error { func (a nopAppender) Add(l labels.Labels, t int64, v float64) error { return nil }
return nil func (a nopAppender) Commit() error { return nil }
}
func (a nopAppender) NeedsThrottling() bool {
return false
}
type collectResultAppender struct { type collectResultAppender struct {
result model.Samples result []sample
throttled bool throttled bool
} }
func (a *collectResultAppender) Append(s *model.Sample) error { func (a *collectResultAppender) Add(l labels.Labels, t int64, v float64) error {
for ln, lv := range s.Metric { // for ln, lv := range s.Metric {
if len(lv) == 0 { // if len(lv) == 0 {
delete(s.Metric, ln) // delete(s.Metric, ln)
} // }
} // }
a.result = append(a.result, s) a.result = append(a.result, sample{
metric: l,
t: t,
v: v,
})
return nil return nil
} }
func (a *collectResultAppender) NeedsThrottling() bool { func (a *collectResultAppender) Commit() error {
return a.throttled return nil
} }
// fakeTargetProvider implements a TargetProvider and allows manual injection // fakeTargetProvider implements a TargetProvider and allows manual injection

View file

@ -19,6 +19,7 @@ import (
"net/http" "net/http"
"sync" "sync"
"time" "time"
"unsafe"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
@ -28,8 +29,8 @@ import (
"golang.org/x/net/context/ctxhttp" "golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
) )
const ( const (
@ -88,7 +89,7 @@ func init() {
// scrapePool manages scrapes for sets of targets. // scrapePool manages scrapes for sets of targets.
type scrapePool struct { type scrapePool struct {
appender storage.SampleAppender appender storage.Appender
ctx context.Context ctx context.Context
@ -101,10 +102,10 @@ type scrapePool struct {
loops map[uint64]loop loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience. // Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop newLoop func(context.Context, scraper, storage.Appender, storage.Appender) loop
} }
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.Appender) *scrapePool {
client, err := NewHTTPClient(cfg.HTTPClientConfig) client, err := NewHTTPClient(cfg.HTTPClientConfig)
if err != nil { if err != nil {
// Any errors that could occur here should be caught during config validation. // Any errors that could occur here should be caught during config validation.
@ -264,42 +265,42 @@ func (sp *scrapePool) sync(targets []*Target) {
} }
// sampleAppender returns an appender for ingested samples from the target. // sampleAppender returns an appender for ingested samples from the target.
func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
app := sp.appender app := sp.appender
// The relabelAppender has to be inside the label-modifying appenders // The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set. // so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{ app = relabelAppender{
SampleAppender: app, Appender: app,
relabelings: mrc, relabelings: mrc,
} }
} }
if sp.config.HonorLabels { if sp.config.HonorLabels {
app = honorLabelsAppender{ app = honorLabelsAppender{
SampleAppender: app, Appender: app,
labels: target.Labels(), labels: target.Labels(),
} }
} else { } else {
app = ruleLabelsAppender{ app = ruleLabelsAppender{
SampleAppender: app, Appender: app,
labels: target.Labels(), labels: target.Labels(),
} }
} }
return app return app
} }
// reportAppender returns an appender for reporting samples for the target. // reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { func (sp *scrapePool) reportAppender(target *Target) storage.Appender {
return ruleLabelsAppender{ return ruleLabelsAppender{
SampleAppender: sp.appender, Appender: sp.appender,
labels: target.Labels(), labels: target.Labels(),
} }
} }
// A scraper retrieves samples and accepts a status report at the end. // A scraper retrieves samples and accepts a status report at the end.
type scraper interface { type scraper interface {
scrape(ctx context.Context, ts time.Time) (model.Samples, error) scrape(ctx context.Context, ts time.Time) (samples, error)
report(start time.Time, dur time.Duration, err error) report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration) time.Duration offset(interval time.Duration) time.Duration
} }
@ -312,7 +313,7 @@ type targetScraper struct {
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) { func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (samples, error) {
req, err := http.NewRequest("GET", s.URL().String(), nil) req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -330,7 +331,7 @@ func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples
} }
var ( var (
allSamples = make(model.Samples, 0, 200) allSamples = make(samples, 0, 200)
decSamples = make(model.Vector, 0, 50) decSamples = make(model.Vector, 0, 50)
) )
sdec := expfmt.SampleDecoder{ sdec := expfmt.SampleDecoder{
@ -344,7 +345,13 @@ func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples
if err = sdec.Decode(&decSamples); err != nil { if err = sdec.Decode(&decSamples); err != nil {
break break
} }
allSamples = append(allSamples, decSamples...) for _, s := range decSamples {
allSamples = append(allSamples, sample{
metric: labels.FromMap(*(*map[string]string)(unsafe.Pointer(&s.Metric))),
t: int64(s.Timestamp),
v: float64(s.Value),
})
}
decSamples = decSamples[:0] decSamples = decSamples[:0]
} }
@ -364,15 +371,15 @@ type loop interface {
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
appender storage.SampleAppender appender storage.Appender
reportAppender storage.SampleAppender reportAppender storage.Appender
done chan struct{} done chan struct{}
ctx context.Context ctx context.Context
cancel func() cancel func()
} }
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.Appender) loop {
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
appender: app, appender: app,
@ -406,32 +413,28 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
default: default:
} }
if !sl.appender.NeedsThrottling() { var (
var ( start = time.Now()
start = time.Now() scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) )
// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
) )
// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}
samples, err := sl.scraper.scrape(scrapeCtx, start)
if err == nil {
sl.append(samples)
} else if errc != nil {
errc <- err
}
sl.report(start, time.Since(start), len(samples), err)
last = start
} else {
targetSkippedScrapes.Inc()
} }
samples, err := sl.scraper.scrape(scrapeCtx, start)
if err == nil {
sl.append(samples)
} else if errc != nil {
errc <- err
}
sl.report(start, time.Since(start), len(samples), err)
last = start
select { select {
case <-sl.ctx.Done(): case <-sl.ctx.Done():
return return
@ -445,19 +448,40 @@ func (sl *scrapeLoop) stop() {
<-sl.done <-sl.done
} }
func (sl *scrapeLoop) append(samples model.Samples) { type sample struct {
metric labels.Labels
t int64
v float64
}
type samples []sample
func (s samples) Len() int { return len(s) }
func (s samples) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s samples) Less(i, j int) bool {
d := labels.Compare(s[i].metric, s[j].metric)
if d < 0 {
return true
} else if d > 0 {
return false
}
return s[i].t < s[j].t
}
func (sl *scrapeLoop) append(samples samples) {
var ( var (
numOutOfOrder = 0 numOutOfOrder = 0
numDuplicates = 0 numDuplicates = 0
) )
for _, s := range samples { for _, s := range samples {
if err := sl.appender.Append(s); err != nil { if err := sl.appender.Add(s.metric, s.t, s.v); err != nil {
switch err { switch err {
case local.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
numOutOfOrder++ numOutOfOrder++
log.With("sample", s).With("error", err).Debug("Sample discarded") log.With("sample", s).With("error", err).Debug("Sample discarded")
case local.ErrDuplicateSampleForTimestamp: case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++ numDuplicates++
log.With("sample", s).With("error", err).Debug("Sample discarded") log.With("sample", s).With("error", err).Debug("Sample discarded")
default: default:
@ -471,47 +495,41 @@ func (sl *scrapeLoop) append(samples model.Samples) {
if numDuplicates > 0 { if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
} }
if err := sl.appender.Commit(); err != nil {
log.With("err", err).Warn("Error commiting scrape")
}
} }
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) { func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) {
sl.scraper.report(start, duration, err) sl.scraper.report(start, duration, err)
ts := model.TimeFromUnixNano(start.UnixNano()) ts := int64(model.TimeFromUnixNano(start.UnixNano()))
var health model.SampleValue var health float64
if err == nil { if err == nil {
health = 1 health = 1
} }
healthSample := &model.Sample{ var (
Metric: model.Metric{ healthMet = labels.Labels{
model.MetricNameLabel: scrapeHealthMetricName, labels.Label{Name: labels.MetricName, Value: scrapeHealthMetricName},
}, }
Timestamp: ts, durationMet = labels.Labels{
Value: health, labels.Label{Name: labels.MetricName, Value: scrapeDurationMetricName},
} }
durationSample := &model.Sample{ countMet = labels.Labels{
Metric: model.Metric{ labels.Label{Name: labels.MetricName, Value: scrapeSamplesMetricName},
model.MetricNameLabel: scrapeDurationMetricName, }
}, )
Timestamp: ts,
Value: model.SampleValue(duration.Seconds()),
}
countSample := &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: scrapeSamplesMetricName,
},
Timestamp: ts,
Value: model.SampleValue(scrapedSamples),
}
if err := sl.reportAppender.Append(healthSample); err != nil { if err := sl.reportAppender.Add(healthMet, ts, health); err != nil {
log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded") log.With("error", err).Warn("Scrape health sample discarded")
} }
if err := sl.reportAppender.Append(durationSample); err != nil { if err := sl.reportAppender.Add(durationMet, ts, duration.Seconds()); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded") log.With("error", err).Warn("Scrape duration sample discarded")
} }
if err := sl.reportAppender.Append(countSample); err != nil { if err := sl.reportAppender.Add(countMet, ts, float64(scrapedSamples)); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded") log.With("error", err).Warn("Scrape sample count sample discarded")
} }
} }

View file

@ -29,6 +29,8 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
@ -78,9 +80,7 @@ func TestScrapePoolStop(t *testing.T) {
for i := 0; i < numTargets; i++ { for i := 0; i < numTargets; i++ {
t := &Target{ t := &Target{
labels: model.LabelSet{ labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
},
} }
l := &testLoop{} l := &testLoop{}
l.stopFunc = func() { l.stopFunc = func() {
@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) {
} }
// On starting to run, new loops created on reload check whether their preceding // On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped. // equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { newLoop := func(ctx context.Context, s scraper, app, reportApp storage.Appender) loop {
l := &testLoop{} l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second { if interval != 3*time.Second {
@ -168,9 +168,7 @@ func TestScrapePoolReload(t *testing.T) {
for i := 0; i < numTargets; i++ { for i := 0; i < numTargets; i++ {
t := &Target{ t := &Target{
labels: model.LabelSet{ labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)),
},
} }
l := &testLoop{} l := &testLoop{}
l.stopFunc = func() { l.stopFunc = func() {
@ -240,8 +238,8 @@ func TestScrapePoolReportAppender(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
} }
if rl.SampleAppender != app { if rl.Appender != app {
t.Fatalf("Expected base appender but got %T", rl.SampleAppender) t.Fatalf("Expected base appender but got %T", rl.Appender)
} }
cfg.HonorLabels = true cfg.HonorLabels = true
@ -251,8 +249,8 @@ func TestScrapePoolReportAppender(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
} }
if hl.SampleAppender != app { if hl.Appender != app {
t.Fatalf("Expected base appender but got %T", hl.SampleAppender) t.Fatalf("Expected base appender but got %T", hl.Appender)
} }
} }
@ -275,12 +273,12 @@ func TestScrapePoolSampleAppender(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
} }
re, ok := rl.SampleAppender.(relabelAppender) re, ok := rl.Appender.(relabelAppender)
if !ok { if !ok {
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) t.Fatalf("Expected relabelAppender but got %T", rl.Appender)
} }
if re.SampleAppender != app { if re.Appender != app {
t.Fatalf("Expected base appender but got %T", re.SampleAppender) t.Fatalf("Expected base appender but got %T", re.Appender)
} }
cfg.HonorLabels = true cfg.HonorLabels = true
@ -290,12 +288,12 @@ func TestScrapePoolSampleAppender(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
} }
re, ok = hl.SampleAppender.(relabelAppender) re, ok = hl.Appender.(relabelAppender)
if !ok { if !ok {
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) t.Fatalf("Expected relabelAppender but got %T", hl.Appender)
} }
if re.SampleAppender != app { if re.Appender != app {
t.Fatalf("Expected base appender but got %T", re.SampleAppender) t.Fatalf("Expected base appender but got %T", re.Appender)
} }
} }
@ -322,7 +320,7 @@ func TestScrapeLoopStop(t *testing.T) {
} }
// Running the scrape loop must exit before calling the scraper even once. // Running the scrape loop must exit before calling the scraper even once.
scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) { scraper.scrapeFunc = func(context.Context, time.Time) (samples, error) {
t.Fatalf("scraper was called for terminated scrape loop") t.Fatalf("scraper was called for terminated scrape loop")
return nil, nil return nil, nil
} }
@ -386,7 +384,7 @@ func TestScrapeLoopRun(t *testing.T) {
scraper.offsetDur = 0 scraper.offsetDur = 0
block := make(chan struct{}) block := make(chan struct{})
scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (model.Samples, error) { scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (samples, error) {
select { select {
case <-block: case <-block:
case <-ctx.Done(): case <-ctx.Done():
@ -444,39 +442,39 @@ func TestTargetScraperScrapeOK(t *testing.T) {
ts := &targetScraper{ ts := &targetScraper{
Target: &Target{ Target: &Target{
labels: model.LabelSet{ labels: labels.FromStrings(
model.SchemeLabel: model.LabelValue(serverURL.Scheme), model.SchemeLabel, serverURL.Scheme,
model.AddressLabel: model.LabelValue(serverURL.Host), model.AddressLabel, serverURL.Host,
}, ),
}, },
client: http.DefaultClient, client: http.DefaultClient,
} }
now := time.Now() now := time.Now()
samples, err := ts.scrape(context.Background(), now) smpls, err := ts.scrape(context.Background(), now)
if err != nil { if err != nil {
t.Fatalf("Unexpected scrape error: %s", err) t.Fatalf("Unexpected scrape error: %s", err)
} }
expectedSamples := model.Samples{ expectedSamples := samples{
{ sample{
Metric: model.Metric{"__name__": "metric_a"}, metric: labels.FromStrings(labels.MetricName, "metric_a"),
Timestamp: model.TimeFromUnixNano(now.UnixNano()), t: timestamp.FromTime(now),
Value: 1, v: 1,
}, },
{ sample{
Metric: model.Metric{"__name__": "metric_b"}, metric: labels.FromStrings(labels.MetricName, "metric_b"),
Timestamp: model.TimeFromUnixNano(now.UnixNano()), t: timestamp.FromTime(now),
Value: 2, v: 2,
}, },
} }
sort.Sort(expectedSamples) sort.Sort(expectedSamples)
sort.Sort(samples) sort.Sort(smpls)
if !reflect.DeepEqual(samples, expectedSamples) { if !reflect.DeepEqual(smpls, expectedSamples) {
t.Errorf("Scraped samples did not match served metrics") t.Errorf("Scraped samples did not match served metrics")
t.Errorf("Expected: %v", expectedSamples) t.Errorf("Expected: %v", expectedSamples)
t.Fatalf("Got: %v", samples) t.Fatalf("Got: %v", smpls)
} }
} }
@ -497,10 +495,10 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
ts := &targetScraper{ ts := &targetScraper{
Target: &Target{ Target: &Target{
labels: model.LabelSet{ labels: labels.FromStrings(
model.SchemeLabel: model.LabelValue(serverURL.Scheme), model.SchemeLabel, serverURL.Scheme,
model.AddressLabel: model.LabelValue(serverURL.Host), model.AddressLabel, serverURL.Host,
}, ),
}, },
client: http.DefaultClient, client: http.DefaultClient,
} }
@ -548,10 +546,10 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
ts := &targetScraper{ ts := &targetScraper{
Target: &Target{ Target: &Target{
labels: model.LabelSet{ labels: labels.FromStrings(
model.SchemeLabel: model.LabelValue(serverURL.Scheme), model.SchemeLabel, serverURL.Scheme,
model.AddressLabel: model.LabelValue(serverURL.Host), model.AddressLabel, serverURL.Host,
}, ),
}, },
client: http.DefaultClient, client: http.DefaultClient,
} }
@ -570,9 +568,9 @@ type testScraper struct {
lastDuration time.Duration lastDuration time.Duration
lastError error lastError error
samples model.Samples samples samples
scrapeErr error scrapeErr error
scrapeFunc func(context.Context, time.Time) (model.Samples, error) scrapeFunc func(context.Context, time.Time) (samples, error)
} }
func (ts *testScraper) offset(interval time.Duration) time.Duration { func (ts *testScraper) offset(interval time.Duration) time.Duration {
@ -585,7 +583,7 @@ func (ts *testScraper) report(start time.Time, duration time.Duration, err error
ts.lastError = err ts.lastError = err
} }
func (ts *testScraper) scrape(ctx context.Context, t time.Time) (model.Samples, error) { func (ts *testScraper) scrape(ctx context.Context, t time.Time) (samples, error) {
if ts.scrapeFunc != nil { if ts.scrapeFunc != nil {
return ts.scrapeFunc(ctx, t) return ts.scrapeFunc(ctx, t)
} }

View file

@ -27,7 +27,8 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
) )
@ -45,9 +46,9 @@ const (
// Target refers to a singular HTTP or HTTPS endpoint. // Target refers to a singular HTTP or HTTPS endpoint.
type Target struct { type Target struct {
// Labels before any processing. // Labels before any processing.
discoveredLabels model.LabelSet discoveredLabels labels.Labels
// Any labels that are added to this target and its metrics. // Any labels that are added to this target and its metrics.
labels model.LabelSet labels labels.Labels
// Additional URL parmeters that are part of the target URL. // Additional URL parmeters that are part of the target URL.
params url.Values params url.Values
@ -58,7 +59,7 @@ type Target struct {
} }
// NewTarget creates a reasonably configured target for querying. // NewTarget creates a reasonably configured target for querying.
func NewTarget(labels, discoveredLabels model.LabelSet, params url.Values) *Target { func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target {
return &Target{ return &Target{
labels: labels, labels: labels,
discoveredLabels: discoveredLabels, discoveredLabels: discoveredLabels,
@ -111,7 +112,7 @@ func (t *Target) String() string {
// hash returns an identifying hash for the target. // hash returns an identifying hash for the target.
func (t *Target) hash() uint64 { func (t *Target) hash() uint64 {
h := fnv.New64a() h := fnv.New64a()
h.Write([]byte(t.labels.Fingerprint().String())) h.Write([]byte(fmt.Sprintf("%016d", t.labels.Hash())))
h.Write([]byte(t.URL().String())) h.Write([]byte(t.URL().String()))
return h.Sum64() return h.Sum64()
@ -134,19 +135,21 @@ func (t *Target) offset(interval time.Duration) time.Duration {
} }
// Labels returns a copy of the set of all public labels of the target. // Labels returns a copy of the set of all public labels of the target.
func (t *Target) Labels() model.LabelSet { func (t *Target) Labels() labels.Labels {
lset := make(model.LabelSet, len(t.labels)) lset := make(labels.Labels, 0, len(t.labels))
for ln, lv := range t.labels { for _, l := range t.labels {
if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) { if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
lset[ln] = lv lset = append(lset, l)
} }
} }
return lset return lset
} }
// DiscoveredLabels returns a copy of the target's labels before any processing. // DiscoveredLabels returns a copy of the target's labels before any processing.
func (t *Target) DiscoveredLabels() model.LabelSet { func (t *Target) DiscoveredLabels() labels.Labels {
return t.discoveredLabels.Clone() lset := make(labels.Labels, len(t.discoveredLabels))
copy(lset, t.discoveredLabels)
return lset
} }
// URL returns a copy of the target's URL. // URL returns a copy of the target's URL.
@ -157,23 +160,23 @@ func (t *Target) URL() *url.URL {
params[k] = make([]string, len(v)) params[k] = make([]string, len(v))
copy(params[k], v) copy(params[k], v)
} }
for k, v := range t.labels { for _, l := range t.labels {
if !strings.HasPrefix(string(k), model.ParamLabelPrefix) { if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
continue continue
} }
ks := string(k[len(model.ParamLabelPrefix):]) ks := l.Name[len(model.ParamLabelPrefix):]
if len(params[ks]) > 0 { if len(params[ks]) > 0 {
params[ks][0] = string(v) params[ks][0] = string(l.Value)
} else { } else {
params[ks] = []string{string(v)} params[ks] = []string{l.Value}
} }
} }
return &url.URL{ return &url.URL{
Scheme: string(t.labels[model.SchemeLabel]), Scheme: string(t.labels.Get(model.SchemeLabel)),
Host: string(t.labels[model.AddressLabel]), Host: string(t.labels.Get(model.AddressLabel)),
Path: string(t.labels[model.MetricsPathLabel]), Path: string(t.labels.Get(model.MetricsPathLabel)),
RawQuery: params.Encode(), RawQuery: params.Encode(),
} }
} }
@ -226,92 +229,98 @@ func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
// Merges the ingested sample's metric with the label set. On a collision the // Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'. // value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct { type ruleLabelsAppender struct {
storage.SampleAppender storage.Appender
labels model.LabelSet labels labels.Labels
} }
func (app ruleLabelsAppender) Append(s *model.Sample) error { func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) error {
for ln, lv := range app.labels { lb := labels.NewBuilder(lset)
if v, ok := s.Metric[ln]; ok && v != "" {
s.Metric[model.ExportedLabelPrefix+ln] = v for _, l := range app.labels {
lv := lset.Get(l.Name)
if lv != "" {
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
} }
s.Metric[ln] = lv lb.Set(l.Name, l.Value)
} }
return app.SampleAppender.Append(s) return app.Appender.Add(lb.Labels(), t, v)
} }
type honorLabelsAppender struct { type honorLabelsAppender struct {
storage.SampleAppender storage.Appender
labels model.LabelSet labels labels.Labels
} }
// Merges the sample's metric with the given labels if the label is not // Merges the sample's metric with the given labels if the label is not
// already present in the metric. // already present in the metric.
// This also considers labels explicitly set to the empty string. // This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Append(s *model.Sample) error { func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) error {
for ln, lv := range app.labels { lb := labels.NewBuilder(lset)
if _, ok := s.Metric[ln]; !ok {
s.Metric[ln] = lv for _, l := range app.labels {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
} }
} }
return app.SampleAppender.Append(s) return app.Appender.Add(lb.Labels(), t, v)
} }
// Applies a set of relabel configurations to the sample's metric // Applies a set of relabel configurations to the sample's metric
// before actually appending it. // before actually appending it.
type relabelAppender struct { type relabelAppender struct {
storage.SampleAppender storage.Appender
relabelings []*config.RelabelConfig relabelings []*config.RelabelConfig
} }
func (app relabelAppender) Append(s *model.Sample) error { func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) error {
labels := relabel.Process(model.LabelSet(s.Metric), app.relabelings...) lset = relabel.Process(lset, app.relabelings...)
// Check if the timeseries was dropped. // Check if the timeseries was dropped.
if labels == nil { if lset == nil {
return nil return nil
} }
s.Metric = model.Metric(labels) return app.Appender.Add(lset, t, v)
return app.SampleAppender.Append(s)
} }
// populateLabels builds a label set from the given label set and scrape configuration. // populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value. // It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling. // Returns a nil label set if the target is dropped during relabeling.
func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
if _, ok := lset[model.AddressLabel]; !ok { if v := lset.Get(model.AddressLabel); v == "" {
return nil, nil, fmt.Errorf("no address") return nil, nil, fmt.Errorf("no address")
} }
// Copy labels into the labelset for the target if they are not // Copy labels into the labelset for the target if they are not set already.
// set already. Apply the labelsets in order of decreasing precedence. scrapeLabels := []labels.Label{
scrapeLabels := model.LabelSet{ {Name: model.JobLabel, Value: cfg.JobName},
model.SchemeLabel: model.LabelValue(cfg.Scheme), {Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), {Name: model.SchemeLabel, Value: cfg.Scheme},
model.JobLabel: model.LabelValue(cfg.JobName),
} }
for ln, lv := range scrapeLabels { lb := labels.NewBuilder(lset)
if _, ok := lset[ln]; !ok {
lset[ln] = lv for _, l := range scrapeLabels {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
} }
} }
// Encode scrape query parameters as labels. // Encode scrape query parameters as labels.
for k, v := range cfg.Params { for k, v := range cfg.Params {
if len(v) > 0 { if len(v) > 0 {
lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) lb.Set(model.ParamLabelPrefix+k, v[0])
} }
} }
preRelabelLabels := lset.Clone() preRelabelLabels := lb.Labels()
lset = relabel.Process(lset, cfg.RelabelConfigs...) lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)
// Check if the target was dropped. // Check if the target was dropped.
if lset == nil { if lset == nil {
return nil, nil, nil return nil, nil, nil
} }
lb = labels.NewBuilder(lset)
// addPort checks whether we should add a default port to the address. // addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either. // If the address is not valid, we don't append a port either.
addPort := func(s string) bool { addPort := func(s string) bool {
@ -324,10 +333,11 @@ func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig mo
_, _, err := net.SplitHostPort(s + ":1234") _, _, err := net.SplitHostPort(s + ":1234")
return err == nil return err == nil
} }
addr := lset.Get(model.AddressLabel)
// If it's an address with no trailing port, infer it based on the used scheme. // If it's an address with no trailing port, infer it based on the used scheme.
if addr := string(lset[model.AddressLabel]); addPort(addr) { if addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary. // Addresses reaching this point are already wrapped in [] if necessary.
switch lset[model.SchemeLabel] { switch lset.Get(model.SchemeLabel) {
case "http", "": case "http", "":
addr = addr + ":80" addr = addr + ":80"
case "https": case "https":
@ -335,44 +345,52 @@ func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig mo
default: default:
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
} }
lset[model.AddressLabel] = model.LabelValue(addr) lb.Set(model.AddressLabel, addr)
} }
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
return nil, nil, err return nil, nil, err
} }
// Meta labels are deleted after relabelling. Other internal labels propagate to // Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set. // the target which decides whether they will be part of their label set.
for ln := range lset { for _, l := range lset {
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
delete(lset, ln) lb.Del(l.Name)
} }
} }
// Default the instance label to the target address. // Default the instance label to the target address.
if _, ok := lset[model.InstanceLabel]; !ok { if v := lset.Get(model.InstanceLabel); v == "" {
lset[model.InstanceLabel] = lset[model.AddressLabel] lb.Set(model.InstanceLabel, addr)
} }
return lset, preRelabelLabels, nil return lb.Labels(), preRelabelLabels, nil
} }
// targetsFromGroup builds targets based on the given TargetGroup and config. // targetsFromGroup builds targets based on the given TargetGroup and config.
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
targets := make([]*Target, 0, len(tg.Targets)) targets := make([]*Target, 0, len(tg.Targets))
for i, lset := range tg.Targets { for i, tlset := range tg.Targets {
// Combine target labels with target group labels. lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))
for ln, lv := range tlset {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
for ln, lv := range tg.Labels { for ln, lv := range tg.Labels {
if _, ok := lset[ln]; !ok { if _, ok := tlset[ln]; !ok {
lset[ln] = lv lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
} }
} }
labels, origLabels, err := populateLabels(lset, cfg)
lset := labels.New(lbls...)
lbls, origLabels, err := populateLabels(lset, cfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
} }
if labels != nil { if lbls != nil {
targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) targets = append(targets, NewTarget(lbls, origLabels, cfg.Params))
} }
} }
return targets, nil return targets, nil

View file

@ -29,6 +29,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
) )
const ( const (
@ -36,11 +37,8 @@ const (
) )
func TestTargetLabels(t *testing.T) { func TestTargetLabels(t *testing.T) {
target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"}) target := newTestTarget("example.com:80", 0, labels.FromStrings("job", "some_job", "foo", "bar"))
want := model.LabelSet{ want := labels.FromStrings(model.JobLabel, "some_job", "foo", "bar")
model.JobLabel: "some_job",
"foo": "bar",
}
got := target.Labels() got := target.Labels()
if !reflect.DeepEqual(want, got) { if !reflect.DeepEqual(want, got) {
t.Errorf("want base labels %v, got %v", want, got) t.Errorf("want base labels %v, got %v", want, got)
@ -54,9 +52,9 @@ func TestTargetOffset(t *testing.T) {
// Calculate offsets for 10000 different targets. // Calculate offsets for 10000 different targets.
for i := range offsets { for i := range offsets {
target := newTestTarget("example.com:80", 0, model.LabelSet{ target := newTestTarget("example.com:80", 0, labels.FromStrings(
"label": model.LabelValue(fmt.Sprintf("%d", i)), "label", fmt.Sprintf("%d", i),
}) ))
offsets[i] = target.offset(interval) offsets[i] = target.offset(interval)
} }
@ -98,13 +96,13 @@ func TestTargetURL(t *testing.T) {
"abc": []string{"foo", "bar", "baz"}, "abc": []string{"foo", "bar", "baz"},
"xyz": []string{"hoo"}, "xyz": []string{"hoo"},
} }
labels := model.LabelSet{ labels := labels.FromMap(map[string]string{
model.AddressLabel: "example.com:1234", model.AddressLabel: "example.com:1234",
model.SchemeLabel: "https", model.SchemeLabel: "https",
model.MetricsPathLabel: "/metricz", model.MetricsPathLabel: "/metricz",
"__param_abc": "overwrite", "__param_abc": "overwrite",
"__param_cde": "huu", "__param_cde": "huu",
} })
target := NewTarget(labels, labels, params) target := NewTarget(labels, labels, params)
// The reserved labels are concatenated into a full URL. The first value for each // The reserved labels are concatenated into a full URL. The first value for each
@ -126,15 +124,13 @@ func TestTargetURL(t *testing.T) {
} }
} }
func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelSet) *Target { func newTestTarget(targetURL string, deadline time.Duration, lbls labels.Labels) *Target {
labels = labels.Clone() lb := labels.NewBuilder(lbls)
labels[model.SchemeLabel] = "http" lb.Set(model.SchemeLabel, "http")
labels[model.AddressLabel] = model.LabelValue(strings.TrimLeft(targetURL, "http://")) lb.Set(model.AddressLabel, strings.TrimLeft(targetURL, "http://"))
labels[model.MetricsPathLabel] = "/metrics" lb.Set(model.MetricsPathLabel, "/metrics")
return &Target{ return &Target{labels: lb.Labels()}
labels: labels,
}
} }
func TestNewHTTPBearerToken(t *testing.T) { func TestNewHTTPBearerToken(t *testing.T) {

View file

@ -28,7 +28,7 @@ import (
// creates the new targets based on the target groups it receives from various // creates the new targets based on the target groups it receives from various
// target providers. // target providers.
type TargetManager struct { type TargetManager struct {
appender storage.SampleAppender appender storage.Appender
scrapeConfigs []*config.ScrapeConfig scrapeConfigs []*config.ScrapeConfig
mtx sync.RWMutex mtx sync.RWMutex
@ -49,7 +49,7 @@ type targetSet struct {
} }
// NewTargetManager creates a new TargetManager. // NewTargetManager creates a new TargetManager.
func NewTargetManager(app storage.SampleAppender) *TargetManager { func NewTargetManager(app storage.Appender) *TargetManager {
return &TargetManager{ return &TargetManager{
appender: app, appender: app,
targetSets: map[string]*targetSet{}, targetSets: map[string]*targetSet{},

View file

@ -19,6 +19,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
) )
func mustNewRegexp(s string) config.Regexp { func mustNewRegexp(s string) config.Regexp {
@ -31,98 +32,98 @@ func mustNewRegexp(s string) config.Regexp {
func TestPopulateLabels(t *testing.T) { func TestPopulateLabels(t *testing.T) {
cases := []struct { cases := []struct {
in model.LabelSet in labels.Labels
cfg *config.ScrapeConfig cfg *config.ScrapeConfig
res model.LabelSet res labels.Labels
resOrig model.LabelSet resOrig labels.Labels
}{ }{
// Regular population of scrape config options. // Regular population of scrape config options.
{ {
in: model.LabelSet{ in: labels.FromMap(map[string]string{
model.AddressLabel: "1.2.3.4:1000", model.AddressLabel: "1.2.3.4:1000",
"custom": "value", "custom": "value",
}, }),
cfg: &config.ScrapeConfig{ cfg: &config.ScrapeConfig{
Scheme: "https", Scheme: "https",
MetricsPath: "/metrics", MetricsPath: "/metrics",
JobName: "job", JobName: "job",
}, },
res: model.LabelSet{ res: labels.FromMap(map[string]string{
model.AddressLabel: "1.2.3.4:1000", model.AddressLabel: "1.2.3.4:1000",
model.InstanceLabel: "1.2.3.4:1000", model.InstanceLabel: "1.2.3.4:1000",
model.SchemeLabel: "https", model.SchemeLabel: "https",
model.MetricsPathLabel: "/metrics", model.MetricsPathLabel: "/metrics",
model.JobLabel: "job", model.JobLabel: "job",
"custom": "value", "custom": "value",
}, }),
resOrig: model.LabelSet{ resOrig: labels.FromMap(map[string]string{
model.AddressLabel: "1.2.3.4:1000", model.AddressLabel: "1.2.3.4:1000",
model.SchemeLabel: "https", model.SchemeLabel: "https",
model.MetricsPathLabel: "/metrics", model.MetricsPathLabel: "/metrics",
model.JobLabel: "job", model.JobLabel: "job",
"custom": "value", "custom": "value",
}, }),
}, },
// Pre-define/overwrite scrape config labels. // Pre-define/overwrite scrape config labels.
// Leave out port and expect it to be defaulted to scheme. // Leave out port and expect it to be defaulted to scheme.
{ {
in: model.LabelSet{ in: labels.FromMap(map[string]string{
model.AddressLabel: "1.2.3.4", model.AddressLabel: "1.2.3.4",
model.SchemeLabel: "http", model.SchemeLabel: "http",
model.MetricsPathLabel: "/custom", model.MetricsPathLabel: "/custom",
model.JobLabel: "custom-job", model.JobLabel: "custom-job",
}, }),
cfg: &config.ScrapeConfig{ cfg: &config.ScrapeConfig{
Scheme: "https", Scheme: "https",
MetricsPath: "/metrics", MetricsPath: "/metrics",
JobName: "job", JobName: "job",
}, },
res: model.LabelSet{ res: labels.FromMap(map[string]string{
model.AddressLabel: "1.2.3.4:80", model.AddressLabel: "1.2.3.4:80",
model.InstanceLabel: "1.2.3.4:80", model.InstanceLabel: "1.2.3.4:80",
model.SchemeLabel: "http", model.SchemeLabel: "http",
model.MetricsPathLabel: "/custom", model.MetricsPathLabel: "/custom",
model.JobLabel: "custom-job", model.JobLabel: "custom-job",
}, }),
resOrig: model.LabelSet{ resOrig: labels.FromMap(map[string]string{
model.AddressLabel: "1.2.3.4", model.AddressLabel: "1.2.3.4",
model.SchemeLabel: "http", model.SchemeLabel: "http",
model.MetricsPathLabel: "/custom", model.MetricsPathLabel: "/custom",
model.JobLabel: "custom-job", model.JobLabel: "custom-job",
}, }),
}, },
// Provide instance label. HTTPS port default for IPv6. // Provide instance label. HTTPS port default for IPv6.
{ {
in: model.LabelSet{ in: labels.FromMap(map[string]string{
model.AddressLabel: "[::1]", model.AddressLabel: "[::1]",
model.InstanceLabel: "custom-instance", model.InstanceLabel: "custom-instance",
}, }),
cfg: &config.ScrapeConfig{ cfg: &config.ScrapeConfig{
Scheme: "https", Scheme: "https",
MetricsPath: "/metrics", MetricsPath: "/metrics",
JobName: "job", JobName: "job",
}, },
res: model.LabelSet{ res: labels.FromMap(map[string]string{
model.AddressLabel: "[::1]:443", model.AddressLabel: "[::1]:443",
model.InstanceLabel: "custom-instance", model.InstanceLabel: "custom-instance",
model.SchemeLabel: "https", model.SchemeLabel: "https",
model.MetricsPathLabel: "/metrics", model.MetricsPathLabel: "/metrics",
model.JobLabel: "job", model.JobLabel: "job",
}, }),
resOrig: model.LabelSet{ resOrig: labels.FromMap(map[string]string{
model.AddressLabel: "[::1]", model.AddressLabel: "[::1]",
model.InstanceLabel: "custom-instance", model.InstanceLabel: "custom-instance",
model.SchemeLabel: "https", model.SchemeLabel: "https",
model.MetricsPathLabel: "/metrics", model.MetricsPathLabel: "/metrics",
model.JobLabel: "job", model.JobLabel: "job",
}, }),
}, },
// Apply relabeling. // Apply relabeling.
{ {
in: model.LabelSet{ in: labels.FromMap(map[string]string{
model.AddressLabel: "1.2.3.4:1000", model.AddressLabel: "1.2.3.4:1000",
"custom": "value", "custom": "value",
}, }),
cfg: &config.ScrapeConfig{ cfg: &config.ScrapeConfig{
Scheme: "https", Scheme: "https",
MetricsPath: "/metrics", MetricsPath: "/metrics",

View file

@ -20,7 +20,6 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"time" "time"
"unsafe"
html_template "html/template" html_template "html/template"
@ -33,7 +32,6 @@ import (
"github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
) )
@ -150,10 +148,7 @@ func (g *Group) run() {
iter := func() { iter := func() {
iterationsScheduled.Inc() iterationsScheduled.Inc()
if g.opts.SampleAppender.NeedsThrottling() {
iterationsSkipped.Inc()
return
}
start := time.Now() start := time.Now()
g.Eval() g.Eval()
@ -281,19 +276,12 @@ func (g *Group) Eval() {
) )
for _, s := range vector { for _, s := range vector {
// TODO(fabxc): adjust after reworking appending. if err := g.opts.SampleAppender.Add(s.Metric, s.T, s.V); err != nil {
var ms model.Sample
lbls := s.Metric.Map()
ms.Metric = *(*model.Metric)(unsafe.Pointer(&lbls))
ms.Timestamp = model.Time(s.T)
ms.Value = model.SampleValue(s.V)
if err := g.opts.SampleAppender.Append(&ms); err != nil {
switch err { switch err {
case local.ErrOutOfOrderSample: case storage.ErrOutOfOrderSample:
numOutOfOrder++ numOutOfOrder++
log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded")
case local.ErrDuplicateSampleForTimestamp: case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++ numDuplicates++
log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded")
default: default:
@ -307,6 +295,9 @@ func (g *Group) Eval() {
if numDuplicates > 0 { if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp") log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp")
} }
if err := g.opts.SampleAppender.Commit(); err != nil {
log.With("err", err).Warn("rule sample appending failed")
}
}(rule) }(rule)
} }
wg.Wait() wg.Wait()
@ -356,7 +347,7 @@ type ManagerOptions struct {
QueryEngine *promql.Engine QueryEngine *promql.Engine
Context context.Context Context context.Context
Notifier *notifier.Notifier Notifier *notifier.Notifier
SampleAppender storage.SampleAppender SampleAppender storage.Appender
} }
// NewManager returns an implementation of Manager, ready to be started // NewManager returns an implementation of Manager, ready to be started

View file

@ -52,7 +52,7 @@ type Querier interface {
// Appender provides batched appends against a storage. // Appender provides batched appends against a storage.
type Appender interface { type Appender interface {
// Add adds a sample pair for the referenced series. // Add adds a sample pair for the referenced series.
Add(lset labels.Labels, t int64, v float64) Add(lset labels.Labels, t int64, v float64) error
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch.
Commit() error Commit() error

View file

@ -24,7 +24,6 @@ func Open(path string) (storage.Storage, error) {
} }
func (a adapter) Querier(mint, maxt int64) (storage.Querier, error) { func (a adapter) Querier(mint, maxt int64) (storage.Querier, error) {
// fmt.Println("new querier at", timestamp.Time(mint), timestamp.Time(maxt), maxt-mint)
return querier{q: a.db.Querier(mint, maxt)}, nil return querier{q: a.db.Querier(mint, maxt)}, nil
} }
@ -74,9 +73,8 @@ type appender struct {
a tsdb.Appender a tsdb.Appender
} }
func (a appender) Add(lset labels.Labels, t int64, v float64) { func (a appender) Add(lset labels.Labels, t int64, v float64) error {
// fmt.Println("add", lset, timestamp.Time(t), v) return a.a.Add(toTSDBLabels(lset), t, v)
a.a.Add(toTSDBLabels(lset), t, v)
} }
func (a appender) Commit() error { return a.a.Commit() } func (a appender) Commit() error { return a.a.Commit() }

View file

@ -19,7 +19,6 @@ import (
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"time" "time"
@ -28,9 +27,10 @@ import (
"github.com/prometheus/common/route" "github.com/prometheus/common/route"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
) )
@ -90,7 +90,7 @@ type apiFunc func(r *http.Request) (interface{}, *apiError)
// API can register a set of endpoints in a router and handle // API can register a set of endpoints in a router and handle
// them using the provided storage and query engine. // them using the provided storage and query engine.
type API struct { type API struct {
Storage local.Storage Storage storage.Storage
QueryEngine *promql.Engine QueryEngine *promql.Engine
targetRetriever targetRetriever targetRetriever targetRetriever
@ -100,7 +100,7 @@ type API struct {
} }
// NewAPI returns an initialized API type. // NewAPI returns an initialized API type.
func NewAPI(qe *promql.Engine, st local.Storage, tr targetRetriever) *API { func NewAPI(qe *promql.Engine, st storage.Storage, tr targetRetriever) *API {
return &API{ return &API{
QueryEngine: qe, QueryEngine: qe,
Storage: st, Storage: st,
@ -229,6 +229,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) {
} }
return nil, &apiError{errorExec, res.Err} return nil, &apiError{errorExec, res.Err}
} }
return &queryData{ return &queryData{
ResultType: res.Value.Type(), ResultType: res.Value.Type(),
Result: res.Value, Result: res.Value,
@ -241,17 +242,17 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
if !model.LabelNameRE.MatchString(name) { if !model.LabelNameRE.MatchString(name) {
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
} }
q, err := api.Storage.Querier() q, err := api.Storage.Querier(math.MinInt64, math.MaxInt64)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err} return nil, &apiError{errorExec, err}
} }
defer q.Close() defer q.Close()
vals, err := q.LabelValuesForLabelName(api.context(r), model.LabelName(name)) // TODO(fabxc): add back request context.
vals, err := q.LabelValues(name)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err} return nil, &apiError{errorExec, err}
} }
sort.Sort(vals)
return vals, nil return vals, nil
} }
@ -284,7 +285,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
end = time.Unix(math.MaxInt64, 0) end = time.Unix(math.MaxInt64, 0)
} }
var matcherSets [][]*promql.LabelMatcher var matcherSets [][]*labels.Matcher
for _, s := range r.Form["match[]"] { for _, s := range r.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {
@ -347,9 +348,9 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
type Target struct { type Target struct {
// Labels before any processing. // Labels before any processing.
DiscoveredLabels model.LabelSet `json:"discoveredLabels"` DiscoveredLabels map[string]string `json:"discoveredLabels"`
// Any labels that are added to this target and its metrics. // Any labels that are added to this target and its metrics.
Labels model.LabelSet `json:"labels"` Labels map[string]string `json:"labels"`
ScrapeUrl string `json:"scrapeUrl"` ScrapeUrl string `json:"scrapeUrl"`
@ -370,8 +371,8 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) {
} }
res[i] = &Target{ res[i] = &Target{
DiscoveredLabels: t.DiscoveredLabels(), DiscoveredLabels: t.DiscoveredLabels().Map(),
Labels: t.Labels(), Labels: t.Labels().Map(),
ScrapeUrl: t.URL().String(), ScrapeUrl: t.URL().String(),
LastError: lastErrStr, LastError: lastErrStr,
LastScrape: t.LastScrape(), LastScrape: t.LastScrape(),

View file

@ -21,6 +21,7 @@ import (
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
) )
@ -37,7 +38,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
req.ParseForm() req.ParseForm()
var matcherSets [][]*promql.LabelMatcher var matcherSets [][]*labels.Matcher
for _, s := range req.Form["match[]"] { for _, s := range req.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
if err != nil { if err != nil {

View file

@ -44,7 +44,7 @@ import (
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/template"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
api_v1 "github.com/prometheus/prometheus/web/api/v1" api_v1 "github.com/prometheus/prometheus/web/api/v1"
@ -59,7 +59,7 @@ type Handler struct {
ruleManager *rules.Manager ruleManager *rules.Manager
queryEngine *promql.Engine queryEngine *promql.Engine
context context.Context context context.Context
storage local.Storage storage storage.Storage
notifier *notifier.Notifier notifier *notifier.Notifier
apiV1 *api_v1.API apiV1 *api_v1.API
@ -104,7 +104,7 @@ type PrometheusVersion struct {
// Options for the web Handler. // Options for the web Handler.
type Options struct { type Options struct {
Context context.Context Context context.Context
Storage local.Storage Storage storage.Storage
QueryEngine *promql.Engine QueryEngine *promql.Engine
TargetManager *retrieval.TargetManager TargetManager *retrieval.TargetManager
RuleManager *rules.Manager RuleManager *rules.Manager
@ -375,7 +375,7 @@ func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
// Bucket targets by job label // Bucket targets by job label
tps := map[string][]retrieval.Target{} tps := map[string][]retrieval.Target{}
for _, t := range h.targetManager.Targets() { for _, t := range h.targetManager.Targets() {
job := string(t.Labels()[model.JobLabel]) job := t.Labels().Get(model.JobLabel)
tps[job] = append(tps[job], t) tps[job] = append(tps[job], t)
} }