From f8fc1f5bb29edb6cc815feda2a27e786a4cacbe9 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 29 Dec 2016 09:27:30 +0100 Subject: [PATCH] *: migrate ingestion to new batch Appender --- cmd/prometheus/main.go | 34 +++---- pkg/labels/labels.go | 5 + promql/engine.go | 13 ++- promql/test.go | 5 + promql/value.go | 36 ++++++- retrieval/helpers_test.go | 36 ++++--- retrieval/scrape.go | 172 ++++++++++++++++++-------------- retrieval/scrape_test.go | 96 +++++++++--------- retrieval/target.go | 164 ++++++++++++++++-------------- retrieval/target_test.go | 32 +++--- retrieval/targetmanager.go | 4 +- retrieval/targetmanager_test.go | 47 ++++----- rules/manager.go | 25 ++--- storage/interface.go | 2 +- storage/tsdb/tsdb.go | 6 +- web/api/v1/api.go | 25 ++--- web/federate.go | 3 +- web/web.go | 8 +- 18 files changed, 386 insertions(+), 327 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a96ecbafcc..4fb3a5e5e9 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -23,7 +23,6 @@ import ( "syscall" "time" - "github.com/fabxc/tsdb" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/version" @@ -34,9 +33,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/rules" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/local" - "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/web" ) @@ -77,19 +74,25 @@ func Main() int { log.Infoln("Build context", version.BuildContext()) var ( - sampleAppender = storage.Fanout{} - reloadables []Reloadable + // sampleAppender = storage.Fanout{} + reloadables []Reloadable ) - _, err := tsdb.Open(cfg.localStoragePath, nil, nil) + localStorage, err := tsdb.Open(cfg.localStoragePath) if err != nil { log.Errorf("Opening storage failed: %s", err) + return 1 } - var localStorage local.Storage - reloadableRemoteStorage := remote.New() - sampleAppender = append(sampleAppender, reloadableRemoteStorage) - reloadables = append(reloadables, reloadableRemoteStorage) + sampleAppender, err := localStorage.Appender() + if err != nil { + log.Errorf("Creating sample appender failed: %s", err) + return 1 + } + + // reloadableRemoteStorage := remote.New() + // sampleAppender = append(sampleAppender, reloadableRemoteStorage) + // reloadables = append(reloadables, reloadableRemoteStorage) var ( notifier = notifier.New(&cfg.notifier) @@ -162,18 +165,13 @@ func Main() int { }() // Start all components. The order is NOT arbitrary. - - if err := localStorage.Start(); err != nil { - log.Errorln("Error opening memory series storage:", err) - return 1 - } defer func() { - if err := localStorage.Stop(); err != nil { + if err := localStorage.Close(); err != nil { log.Errorln("Error stopping storage:", err) } }() - defer reloadableRemoteStorage.Stop() + // defer reloadableRemoteStorage.Stop() // The storage has to be fully initialized before registering. if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok { diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index 03e8a94f66..4300b31fea 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -2,6 +2,7 @@ package labels import ( "bytes" + "encoding/json" "sort" "strconv" "strings" @@ -48,6 +49,10 @@ func (ls Labels) String() string { return b.String() } +func (ls Labels) MarshalJSON() ([]byte, error) { + return json.Marshal(ls.Map()) +} + // Hash returns a hash value for the label set. func (ls Labels) Hash() uint64 { b := make([]byte, 0, 1024) diff --git a/promql/engine.go b/promql/engine.go index 86a681ccfe..7fd8a82baf 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -336,6 +336,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( Seriess[h] = ss } ss.Points = append(ss.Points, sample.Point) + Seriess[h] = ss } default: panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) @@ -358,15 +359,13 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return nil, err } - // Turn Matrix type with protected metric into model.Matrix. - resMatrix := mat - // TODO(fabxc): order ensured by storage? - // sortTimer := query.stats.GetTimer(stats.ResultSortTime).Start() - // sort.Sort(resMatrix) - // sortTimer.Stop() + // TODO(fabxc): where to ensure metric labels are a copy from the storage internals. + sortTimer := query.stats.GetTimer(stats.ResultSortTime).Start() + sort.Sort(mat) + sortTimer.Stop() - return resMatrix, nil + return mat, nil } func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Querier, error) { diff --git a/promql/test.go b/promql/test.go index 7b3bbe9884..8db392d628 100644 --- a/promql/test.go +++ b/promql/test.go @@ -377,6 +377,11 @@ func (ev *evalCmd) compareResult(result Value) error { return fmt.Errorf("received instant result on range evaluation") } + fmt.Println("vector result", len(val), ev.expr) + for _, ss := range val { + fmt.Println(" ", ss.Metric, ss.Point) + } + seen := map[uint64]bool{} for pos, v := range val { fp := v.Metric.Hash() diff --git a/promql/value.go b/promql/value.go index 95eaaebd49..59bac51259 100644 --- a/promql/value.go +++ b/promql/value.go @@ -1,7 +1,9 @@ package promql import ( + "encoding/json" "fmt" + "strconv" "strings" "github.com/prometheus/prometheus/pkg/labels" @@ -40,6 +42,10 @@ func (s String) String() string { return s.V } +func (s String) MarshalJSON() ([]byte, error) { + return json.Marshal([...]interface{}{float64(s.T) / 1000, s.V}) +} + // Scalar is a data point that's explicitly not associated with a metric. type Scalar struct { T int64 @@ -50,10 +56,15 @@ func (s Scalar) String() string { return fmt.Sprintf("scalar: %v @[%v]", s.V, s.T) } +func (s Scalar) MarshalJSON() ([]byte, error) { + v := strconv.FormatFloat(s.V, 'f', -1, 64) + return json.Marshal([...]interface{}{float64(s.T) / 1000, v}) +} + // Series is a stream of data points belonging to a metric. type Series struct { - Metric labels.Labels - Points []Point + Metric labels.Labels `json:"metric"` + Points []Point `json:"values"` } func (s Series) String() string { @@ -74,6 +85,12 @@ func (p Point) String() string { return fmt.Sprintf("%f @[%d]", p.V, p.T) } +// MarshalJSON implements json.Marshaler. +func (p Point) MarshalJSON() ([]byte, error) { + v := strconv.FormatFloat(p.V, 'f', -1, 64) + return json.Marshal([...]interface{}{float64(p.T) / 1000, v}) +} + // Sample is a single sample belonging to a metric. type Sample struct { Point @@ -85,6 +102,17 @@ func (s Sample) String() string { return fmt.Sprintf("%s => %s", s.Metric, s.Point) } +func (s Sample) MarshalJSON() ([]byte, error) { + v := struct { + M labels.Labels `json:"metric"` + V Point `json:"value"` + }{ + M: s.Metric, + V: s.Point, + } + return json.Marshal(v) +} + // Vector is basically only an alias for model.Samples, but the // contract is that in a Vector, all Samples have the same timestamp. type Vector []Sample @@ -112,6 +140,10 @@ func (m Matrix) String() string { return strings.Join(strs, "\n") } +func (m Matrix) Len() int { return len(m) } +func (m Matrix) Less(i, j int) bool { return labels.Compare(m[i].Metric, m[j].Metric) < 0 } +func (m Matrix) Swap(i, j int) { m[i], m[j] = m[j], m[i] } + // Result holds the resulting value of an execution or an error // if any occurred. type Result struct { diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index d19f6b41f1..43c130c217 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -14,38 +14,36 @@ package retrieval import ( - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" ) type nopAppender struct{} -func (a nopAppender) Append(*model.Sample) error { - return nil -} - -func (a nopAppender) NeedsThrottling() bool { - return false -} +func (a nopAppender) Add(l labels.Labels, t int64, v float64) error { return nil } +func (a nopAppender) Commit() error { return nil } type collectResultAppender struct { - result model.Samples + result []sample throttled bool } -func (a *collectResultAppender) Append(s *model.Sample) error { - for ln, lv := range s.Metric { - if len(lv) == 0 { - delete(s.Metric, ln) - } - } - a.result = append(a.result, s) +func (a *collectResultAppender) Add(l labels.Labels, t int64, v float64) error { + // for ln, lv := range s.Metric { + // if len(lv) == 0 { + // delete(s.Metric, ln) + // } + // } + a.result = append(a.result, sample{ + metric: l, + t: t, + v: v, + }) return nil } -func (a *collectResultAppender) NeedsThrottling() bool { - return a.throttled +func (a *collectResultAppender) Commit() error { + return nil } // fakeTargetProvider implements a TargetProvider and allows manual injection diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 0b4e4a8e91..1f29ae30ce 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -19,6 +19,7 @@ import ( "net/http" "sync" "time" + "unsafe" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/expfmt" @@ -28,8 +29,8 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/local" ) const ( @@ -88,7 +89,7 @@ func init() { // scrapePool manages scrapes for sets of targets. type scrapePool struct { - appender storage.SampleAppender + appender storage.Appender ctx context.Context @@ -101,10 +102,10 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop + newLoop func(context.Context, scraper, storage.Appender, storage.Appender) loop } -func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { +func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.Appender) *scrapePool { client, err := NewHTTPClient(cfg.HTTPClientConfig) if err != nil { // Any errors that could occur here should be caught during config validation. @@ -264,42 +265,42 @@ func (sp *scrapePool) sync(targets []*Target) { } // sampleAppender returns an appender for ingested samples from the target. -func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { +func (sp *scrapePool) sampleAppender(target *Target) storage.Appender { app := sp.appender // The relabelAppender has to be inside the label-modifying appenders // so the relabeling rules are applied to the correct label set. if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { app = relabelAppender{ - SampleAppender: app, - relabelings: mrc, + Appender: app, + relabelings: mrc, } } if sp.config.HonorLabels { app = honorLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), + Appender: app, + labels: target.Labels(), } } else { app = ruleLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), + Appender: app, + labels: target.Labels(), } } return app } // reportAppender returns an appender for reporting samples for the target. -func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { +func (sp *scrapePool) reportAppender(target *Target) storage.Appender { return ruleLabelsAppender{ - SampleAppender: sp.appender, - labels: target.Labels(), + Appender: sp.appender, + labels: target.Labels(), } } // A scraper retrieves samples and accepts a status report at the end. type scraper interface { - scrape(ctx context.Context, ts time.Time) (model.Samples, error) + scrape(ctx context.Context, ts time.Time) (samples, error) report(start time.Time, dur time.Duration, err error) offset(interval time.Duration) time.Duration } @@ -312,7 +313,7 @@ type targetScraper struct { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` -func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) { +func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (samples, error) { req, err := http.NewRequest("GET", s.URL().String(), nil) if err != nil { return nil, err @@ -330,7 +331,7 @@ func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples } var ( - allSamples = make(model.Samples, 0, 200) + allSamples = make(samples, 0, 200) decSamples = make(model.Vector, 0, 50) ) sdec := expfmt.SampleDecoder{ @@ -344,7 +345,13 @@ func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples if err = sdec.Decode(&decSamples); err != nil { break } - allSamples = append(allSamples, decSamples...) + for _, s := range decSamples { + allSamples = append(allSamples, sample{ + metric: labels.FromMap(*(*map[string]string)(unsafe.Pointer(&s.Metric))), + t: int64(s.Timestamp), + v: float64(s.Value), + }) + } decSamples = decSamples[:0] } @@ -364,15 +371,15 @@ type loop interface { type scrapeLoop struct { scraper scraper - appender storage.SampleAppender - reportAppender storage.SampleAppender + appender storage.Appender + reportAppender storage.Appender done chan struct{} ctx context.Context cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { +func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.Appender) loop { sl := &scrapeLoop{ scraper: sc, appender: app, @@ -406,32 +413,28 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { default: } - if !sl.appender.NeedsThrottling() { - var ( - start = time.Now() - scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + var ( + start = time.Now() + scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + ) + + // Only record after the first scrape. + if !last.IsZero() { + targetIntervalLength.WithLabelValues(interval.String()).Observe( + time.Since(last).Seconds(), ) - - // Only record after the first scrape. - if !last.IsZero() { - targetIntervalLength.WithLabelValues(interval.String()).Observe( - time.Since(last).Seconds(), - ) - } - - samples, err := sl.scraper.scrape(scrapeCtx, start) - if err == nil { - sl.append(samples) - } else if errc != nil { - errc <- err - } - - sl.report(start, time.Since(start), len(samples), err) - last = start - } else { - targetSkippedScrapes.Inc() } + samples, err := sl.scraper.scrape(scrapeCtx, start) + if err == nil { + sl.append(samples) + } else if errc != nil { + errc <- err + } + + sl.report(start, time.Since(start), len(samples), err) + last = start + select { case <-sl.ctx.Done(): return @@ -445,19 +448,40 @@ func (sl *scrapeLoop) stop() { <-sl.done } -func (sl *scrapeLoop) append(samples model.Samples) { +type sample struct { + metric labels.Labels + t int64 + v float64 +} + +type samples []sample + +func (s samples) Len() int { return len(s) } +func (s samples) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s samples) Less(i, j int) bool { + d := labels.Compare(s[i].metric, s[j].metric) + if d < 0 { + return true + } else if d > 0 { + return false + } + return s[i].t < s[j].t +} + +func (sl *scrapeLoop) append(samples samples) { var ( numOutOfOrder = 0 numDuplicates = 0 ) for _, s := range samples { - if err := sl.appender.Append(s); err != nil { + if err := sl.appender.Add(s.metric, s.t, s.v); err != nil { switch err { - case local.ErrOutOfOrderSample: + case storage.ErrOutOfOrderSample: numOutOfOrder++ log.With("sample", s).With("error", err).Debug("Sample discarded") - case local.ErrDuplicateSampleForTimestamp: + case storage.ErrDuplicateSampleForTimestamp: numDuplicates++ log.With("sample", s).With("error", err).Debug("Sample discarded") default: @@ -471,47 +495,41 @@ func (sl *scrapeLoop) append(samples model.Samples) { if numDuplicates > 0 { log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } + + if err := sl.appender.Commit(); err != nil { + log.With("err", err).Warn("Error commiting scrape") + } } func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) { sl.scraper.report(start, duration, err) - ts := model.TimeFromUnixNano(start.UnixNano()) + ts := int64(model.TimeFromUnixNano(start.UnixNano())) - var health model.SampleValue + var health float64 if err == nil { health = 1 } - healthSample := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeHealthMetricName, - }, - Timestamp: ts, - Value: health, - } - durationSample := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeDurationMetricName, - }, - Timestamp: ts, - Value: model.SampleValue(duration.Seconds()), - } - countSample := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeSamplesMetricName, - }, - Timestamp: ts, - Value: model.SampleValue(scrapedSamples), - } + var ( + healthMet = labels.Labels{ + labels.Label{Name: labels.MetricName, Value: scrapeHealthMetricName}, + } + durationMet = labels.Labels{ + labels.Label{Name: labels.MetricName, Value: scrapeDurationMetricName}, + } + countMet = labels.Labels{ + labels.Label{Name: labels.MetricName, Value: scrapeSamplesMetricName}, + } + ) - if err := sl.reportAppender.Append(healthSample); err != nil { - log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded") + if err := sl.reportAppender.Add(healthMet, ts, health); err != nil { + log.With("error", err).Warn("Scrape health sample discarded") } - if err := sl.reportAppender.Append(durationSample); err != nil { - log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded") + if err := sl.reportAppender.Add(durationMet, ts, duration.Seconds()); err != nil { + log.With("error", err).Warn("Scrape duration sample discarded") } - if err := sl.reportAppender.Append(countSample); err != nil { - log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded") + if err := sl.reportAppender.Add(countMet, ts, float64(scrapedSamples)); err != nil { + log.With("error", err).Warn("Scrape sample count sample discarded") } } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index aaa19132f0..f235be7aaa 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -29,6 +29,8 @@ import ( "golang.org/x/net/context" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" ) @@ -78,9 +80,7 @@ func TestScrapePoolStop(t *testing.T) { for i := 0; i < numTargets; i++ { t := &Target{ - labels: model.LabelSet{ - model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), - }, + labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)), } l := &testLoop{} l.stopFunc = func() { @@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { + newLoop := func(ctx context.Context, s scraper, app, reportApp storage.Appender) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -168,9 +168,7 @@ func TestScrapePoolReload(t *testing.T) { for i := 0; i < numTargets; i++ { t := &Target{ - labels: model.LabelSet{ - model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), - }, + labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)), } l := &testLoop{} l.stopFunc = func() { @@ -240,8 +238,8 @@ func TestScrapePoolReportAppender(t *testing.T) { if !ok { t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) } - if rl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", rl.SampleAppender) + if rl.Appender != app { + t.Fatalf("Expected base appender but got %T", rl.Appender) } cfg.HonorLabels = true @@ -251,8 +249,8 @@ func TestScrapePoolReportAppender(t *testing.T) { if !ok { t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) } - if hl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", hl.SampleAppender) + if hl.Appender != app { + t.Fatalf("Expected base appender but got %T", hl.Appender) } } @@ -275,12 +273,12 @@ func TestScrapePoolSampleAppender(t *testing.T) { if !ok { t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) } - re, ok := rl.SampleAppender.(relabelAppender) + re, ok := rl.Appender.(relabelAppender) if !ok { - t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) + t.Fatalf("Expected relabelAppender but got %T", rl.Appender) } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) + if re.Appender != app { + t.Fatalf("Expected base appender but got %T", re.Appender) } cfg.HonorLabels = true @@ -290,12 +288,12 @@ func TestScrapePoolSampleAppender(t *testing.T) { if !ok { t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) } - re, ok = hl.SampleAppender.(relabelAppender) + re, ok = hl.Appender.(relabelAppender) if !ok { - t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) + t.Fatalf("Expected relabelAppender but got %T", hl.Appender) } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) + if re.Appender != app { + t.Fatalf("Expected base appender but got %T", re.Appender) } } @@ -322,7 +320,7 @@ func TestScrapeLoopStop(t *testing.T) { } // Running the scrape loop must exit before calling the scraper even once. - scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) { + scraper.scrapeFunc = func(context.Context, time.Time) (samples, error) { t.Fatalf("scraper was called for terminated scrape loop") return nil, nil } @@ -386,7 +384,7 @@ func TestScrapeLoopRun(t *testing.T) { scraper.offsetDur = 0 block := make(chan struct{}) - scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (model.Samples, error) { + scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (samples, error) { select { case <-block: case <-ctx.Done(): @@ -444,39 +442,39 @@ func TestTargetScraperScrapeOK(t *testing.T) { ts := &targetScraper{ Target: &Target{ - labels: model.LabelSet{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }, + labels: labels.FromStrings( + model.SchemeLabel, serverURL.Scheme, + model.AddressLabel, serverURL.Host, + ), }, client: http.DefaultClient, } now := time.Now() - samples, err := ts.scrape(context.Background(), now) + smpls, err := ts.scrape(context.Background(), now) if err != nil { t.Fatalf("Unexpected scrape error: %s", err) } - expectedSamples := model.Samples{ - { - Metric: model.Metric{"__name__": "metric_a"}, - Timestamp: model.TimeFromUnixNano(now.UnixNano()), - Value: 1, + expectedSamples := samples{ + sample{ + metric: labels.FromStrings(labels.MetricName, "metric_a"), + t: timestamp.FromTime(now), + v: 1, }, - { - Metric: model.Metric{"__name__": "metric_b"}, - Timestamp: model.TimeFromUnixNano(now.UnixNano()), - Value: 2, + sample{ + metric: labels.FromStrings(labels.MetricName, "metric_b"), + t: timestamp.FromTime(now), + v: 2, }, } sort.Sort(expectedSamples) - sort.Sort(samples) + sort.Sort(smpls) - if !reflect.DeepEqual(samples, expectedSamples) { + if !reflect.DeepEqual(smpls, expectedSamples) { t.Errorf("Scraped samples did not match served metrics") t.Errorf("Expected: %v", expectedSamples) - t.Fatalf("Got: %v", samples) + t.Fatalf("Got: %v", smpls) } } @@ -497,10 +495,10 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { ts := &targetScraper{ Target: &Target{ - labels: model.LabelSet{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }, + labels: labels.FromStrings( + model.SchemeLabel, serverURL.Scheme, + model.AddressLabel, serverURL.Host, + ), }, client: http.DefaultClient, } @@ -548,10 +546,10 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) { ts := &targetScraper{ Target: &Target{ - labels: model.LabelSet{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }, + labels: labels.FromStrings( + model.SchemeLabel, serverURL.Scheme, + model.AddressLabel, serverURL.Host, + ), }, client: http.DefaultClient, } @@ -570,9 +568,9 @@ type testScraper struct { lastDuration time.Duration lastError error - samples model.Samples + samples samples scrapeErr error - scrapeFunc func(context.Context, time.Time) (model.Samples, error) + scrapeFunc func(context.Context, time.Time) (samples, error) } func (ts *testScraper) offset(interval time.Duration) time.Duration { @@ -585,7 +583,7 @@ func (ts *testScraper) report(start time.Time, duration time.Duration, err error ts.lastError = err } -func (ts *testScraper) scrape(ctx context.Context, t time.Time) (model.Samples, error) { +func (ts *testScraper) scrape(ctx context.Context, t time.Time) (samples, error) { if ts.scrapeFunc != nil { return ts.scrapeFunc(ctx, t) } diff --git a/retrieval/target.go b/retrieval/target.go index a9a793f9a8..4d93ad8459 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -27,7 +27,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/relabel" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" ) @@ -45,9 +46,9 @@ const ( // Target refers to a singular HTTP or HTTPS endpoint. type Target struct { // Labels before any processing. - discoveredLabels model.LabelSet + discoveredLabels labels.Labels // Any labels that are added to this target and its metrics. - labels model.LabelSet + labels labels.Labels // Additional URL parmeters that are part of the target URL. params url.Values @@ -58,7 +59,7 @@ type Target struct { } // NewTarget creates a reasonably configured target for querying. -func NewTarget(labels, discoveredLabels model.LabelSet, params url.Values) *Target { +func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target { return &Target{ labels: labels, discoveredLabels: discoveredLabels, @@ -111,7 +112,7 @@ func (t *Target) String() string { // hash returns an identifying hash for the target. func (t *Target) hash() uint64 { h := fnv.New64a() - h.Write([]byte(t.labels.Fingerprint().String())) + h.Write([]byte(fmt.Sprintf("%016d", t.labels.Hash()))) h.Write([]byte(t.URL().String())) return h.Sum64() @@ -134,19 +135,21 @@ func (t *Target) offset(interval time.Duration) time.Duration { } // Labels returns a copy of the set of all public labels of the target. -func (t *Target) Labels() model.LabelSet { - lset := make(model.LabelSet, len(t.labels)) - for ln, lv := range t.labels { - if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) { - lset[ln] = lv +func (t *Target) Labels() labels.Labels { + lset := make(labels.Labels, 0, len(t.labels)) + for _, l := range t.labels { + if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) { + lset = append(lset, l) } } return lset } // DiscoveredLabels returns a copy of the target's labels before any processing. -func (t *Target) DiscoveredLabels() model.LabelSet { - return t.discoveredLabels.Clone() +func (t *Target) DiscoveredLabels() labels.Labels { + lset := make(labels.Labels, len(t.discoveredLabels)) + copy(lset, t.discoveredLabels) + return lset } // URL returns a copy of the target's URL. @@ -157,23 +160,23 @@ func (t *Target) URL() *url.URL { params[k] = make([]string, len(v)) copy(params[k], v) } - for k, v := range t.labels { - if !strings.HasPrefix(string(k), model.ParamLabelPrefix) { + for _, l := range t.labels { + if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) { continue } - ks := string(k[len(model.ParamLabelPrefix):]) + ks := l.Name[len(model.ParamLabelPrefix):] if len(params[ks]) > 0 { - params[ks][0] = string(v) + params[ks][0] = string(l.Value) } else { - params[ks] = []string{string(v)} + params[ks] = []string{l.Value} } } return &url.URL{ - Scheme: string(t.labels[model.SchemeLabel]), - Host: string(t.labels[model.AddressLabel]), - Path: string(t.labels[model.MetricsPathLabel]), + Scheme: string(t.labels.Get(model.SchemeLabel)), + Host: string(t.labels.Get(model.AddressLabel)), + Path: string(t.labels.Get(model.MetricsPathLabel)), RawQuery: params.Encode(), } } @@ -226,92 +229,98 @@ func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } // Merges the ingested sample's metric with the label set. On a collision the // value of the ingested label is stored in a label prefixed with 'exported_'. type ruleLabelsAppender struct { - storage.SampleAppender - labels model.LabelSet + storage.Appender + labels labels.Labels } -func (app ruleLabelsAppender) Append(s *model.Sample) error { - for ln, lv := range app.labels { - if v, ok := s.Metric[ln]; ok && v != "" { - s.Metric[model.ExportedLabelPrefix+ln] = v +func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) error { + lb := labels.NewBuilder(lset) + + for _, l := range app.labels { + lv := lset.Get(l.Name) + if lv != "" { + lb.Set(model.ExportedLabelPrefix+l.Name, lv) } - s.Metric[ln] = lv + lb.Set(l.Name, l.Value) } - return app.SampleAppender.Append(s) + return app.Appender.Add(lb.Labels(), t, v) } type honorLabelsAppender struct { - storage.SampleAppender - labels model.LabelSet + storage.Appender + labels labels.Labels } // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Append(s *model.Sample) error { - for ln, lv := range app.labels { - if _, ok := s.Metric[ln]; !ok { - s.Metric[ln] = lv +func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) error { + lb := labels.NewBuilder(lset) + + for _, l := range app.labels { + if lv := lset.Get(l.Name); lv == "" { + lb.Set(l.Name, l.Value) } } - return app.SampleAppender.Append(s) + return app.Appender.Add(lb.Labels(), t, v) } // Applies a set of relabel configurations to the sample's metric // before actually appending it. type relabelAppender struct { - storage.SampleAppender + storage.Appender relabelings []*config.RelabelConfig } -func (app relabelAppender) Append(s *model.Sample) error { - labels := relabel.Process(model.LabelSet(s.Metric), app.relabelings...) +func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) error { + lset = relabel.Process(lset, app.relabelings...) // Check if the timeseries was dropped. - if labels == nil { + if lset == nil { return nil } - s.Metric = model.Metric(labels) - - return app.SampleAppender.Append(s) + return app.Appender.Add(lset, t, v) } // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling. -func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { - if _, ok := lset[model.AddressLabel]; !ok { +func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) { + if v := lset.Get(model.AddressLabel); v == "" { return nil, nil, fmt.Errorf("no address") } - // Copy labels into the labelset for the target if they are not - // set already. Apply the labelsets in order of decreasing precedence. - scrapeLabels := model.LabelSet{ - model.SchemeLabel: model.LabelValue(cfg.Scheme), - model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), - model.JobLabel: model.LabelValue(cfg.JobName), + // Copy labels into the labelset for the target if they are not set already. + scrapeLabels := []labels.Label{ + {Name: model.JobLabel, Value: cfg.JobName}, + {Name: model.MetricsPathLabel, Value: cfg.MetricsPath}, + {Name: model.SchemeLabel, Value: cfg.Scheme}, } - for ln, lv := range scrapeLabels { - if _, ok := lset[ln]; !ok { - lset[ln] = lv + lb := labels.NewBuilder(lset) + + for _, l := range scrapeLabels { + if lv := lset.Get(l.Name); lv == "" { + lb.Set(l.Name, l.Value) } } // Encode scrape query parameters as labels. for k, v := range cfg.Params { if len(v) > 0 { - lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) + lb.Set(model.ParamLabelPrefix+k, v[0]) } } - preRelabelLabels := lset.Clone() - lset = relabel.Process(lset, cfg.RelabelConfigs...) + preRelabelLabels := lb.Labels() + lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...) // Check if the target was dropped. if lset == nil { return nil, nil, nil } + lb = labels.NewBuilder(lset) + // addPort checks whether we should add a default port to the address. // If the address is not valid, we don't append a port either. addPort := func(s string) bool { @@ -324,10 +333,11 @@ func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig mo _, _, err := net.SplitHostPort(s + ":1234") return err == nil } + addr := lset.Get(model.AddressLabel) // If it's an address with no trailing port, infer it based on the used scheme. - if addr := string(lset[model.AddressLabel]); addPort(addr) { + if addPort(addr) { // Addresses reaching this point are already wrapped in [] if necessary. - switch lset[model.SchemeLabel] { + switch lset.Get(model.SchemeLabel) { case "http", "": addr = addr + ":80" case "https": @@ -335,44 +345,52 @@ func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig mo default: return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) } - lset[model.AddressLabel] = model.LabelValue(addr) + lb.Set(model.AddressLabel, addr) } - if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { + + if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil { return nil, nil, err } // Meta labels are deleted after relabelling. Other internal labels propagate to // the target which decides whether they will be part of their label set. - for ln := range lset { - if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { - delete(lset, ln) + for _, l := range lset { + if strings.HasPrefix(l.Name, model.MetaLabelPrefix) { + lb.Del(l.Name) } } // Default the instance label to the target address. - if _, ok := lset[model.InstanceLabel]; !ok { - lset[model.InstanceLabel] = lset[model.AddressLabel] + if v := lset.Get(model.InstanceLabel); v == "" { + lb.Set(model.InstanceLabel, addr) } - return lset, preRelabelLabels, nil + return lb.Labels(), preRelabelLabels, nil } // targetsFromGroup builds targets based on the given TargetGroup and config. func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { targets := make([]*Target, 0, len(tg.Targets)) - for i, lset := range tg.Targets { - // Combine target labels with target group labels. + for i, tlset := range tg.Targets { + lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels)) + + for ln, lv := range tlset { + lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) + } for ln, lv := range tg.Labels { - if _, ok := lset[ln]; !ok { - lset[ln] = lv + if _, ok := tlset[ln]; !ok { + lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) } } - labels, origLabels, err := populateLabels(lset, cfg) + + lset := labels.New(lbls...) + + lbls, origLabels, err := populateLabels(lset, cfg) if err != nil { return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) } - if labels != nil { - targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) + if lbls != nil { + targets = append(targets, NewTarget(lbls, origLabels, cfg.Params)) } } return targets, nil diff --git a/retrieval/target_test.go b/retrieval/target_test.go index a0f7c0237d..100ebc9bb3 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" ) const ( @@ -36,11 +37,8 @@ const ( ) func TestTargetLabels(t *testing.T) { - target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"}) - want := model.LabelSet{ - model.JobLabel: "some_job", - "foo": "bar", - } + target := newTestTarget("example.com:80", 0, labels.FromStrings("job", "some_job", "foo", "bar")) + want := labels.FromStrings(model.JobLabel, "some_job", "foo", "bar") got := target.Labels() if !reflect.DeepEqual(want, got) { t.Errorf("want base labels %v, got %v", want, got) @@ -54,9 +52,9 @@ func TestTargetOffset(t *testing.T) { // Calculate offsets for 10000 different targets. for i := range offsets { - target := newTestTarget("example.com:80", 0, model.LabelSet{ - "label": model.LabelValue(fmt.Sprintf("%d", i)), - }) + target := newTestTarget("example.com:80", 0, labels.FromStrings( + "label", fmt.Sprintf("%d", i), + )) offsets[i] = target.offset(interval) } @@ -98,13 +96,13 @@ func TestTargetURL(t *testing.T) { "abc": []string{"foo", "bar", "baz"}, "xyz": []string{"hoo"}, } - labels := model.LabelSet{ + labels := labels.FromMap(map[string]string{ model.AddressLabel: "example.com:1234", model.SchemeLabel: "https", model.MetricsPathLabel: "/metricz", "__param_abc": "overwrite", "__param_cde": "huu", - } + }) target := NewTarget(labels, labels, params) // The reserved labels are concatenated into a full URL. The first value for each @@ -126,15 +124,13 @@ func TestTargetURL(t *testing.T) { } } -func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelSet) *Target { - labels = labels.Clone() - labels[model.SchemeLabel] = "http" - labels[model.AddressLabel] = model.LabelValue(strings.TrimLeft(targetURL, "http://")) - labels[model.MetricsPathLabel] = "/metrics" +func newTestTarget(targetURL string, deadline time.Duration, lbls labels.Labels) *Target { + lb := labels.NewBuilder(lbls) + lb.Set(model.SchemeLabel, "http") + lb.Set(model.AddressLabel, strings.TrimLeft(targetURL, "http://")) + lb.Set(model.MetricsPathLabel, "/metrics") - return &Target{ - labels: labels, - } + return &Target{labels: lb.Labels()} } func TestNewHTTPBearerToken(t *testing.T) { diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 48d6411856..beab9b5b59 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -28,7 +28,7 @@ import ( // creates the new targets based on the target groups it receives from various // target providers. type TargetManager struct { - appender storage.SampleAppender + appender storage.Appender scrapeConfigs []*config.ScrapeConfig mtx sync.RWMutex @@ -49,7 +49,7 @@ type targetSet struct { } // NewTargetManager creates a new TargetManager. -func NewTargetManager(app storage.SampleAppender) *TargetManager { +func NewTargetManager(app storage.Appender) *TargetManager { return &TargetManager{ appender: app, targetSets: map[string]*targetSet{}, diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index dc4568a955..030776521e 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" ) func mustNewRegexp(s string) config.Regexp { @@ -31,98 +32,98 @@ func mustNewRegexp(s string) config.Regexp { func TestPopulateLabels(t *testing.T) { cases := []struct { - in model.LabelSet + in labels.Labels cfg *config.ScrapeConfig - res model.LabelSet - resOrig model.LabelSet + res labels.Labels + resOrig labels.Labels }{ // Regular population of scrape config options. { - in: model.LabelSet{ + in: labels.FromMap(map[string]string{ model.AddressLabel: "1.2.3.4:1000", "custom": "value", - }, + }), cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", JobName: "job", }, - res: model.LabelSet{ + res: labels.FromMap(map[string]string{ model.AddressLabel: "1.2.3.4:1000", model.InstanceLabel: "1.2.3.4:1000", model.SchemeLabel: "https", model.MetricsPathLabel: "/metrics", model.JobLabel: "job", "custom": "value", - }, - resOrig: model.LabelSet{ + }), + resOrig: labels.FromMap(map[string]string{ model.AddressLabel: "1.2.3.4:1000", model.SchemeLabel: "https", model.MetricsPathLabel: "/metrics", model.JobLabel: "job", "custom": "value", - }, + }), }, // Pre-define/overwrite scrape config labels. // Leave out port and expect it to be defaulted to scheme. { - in: model.LabelSet{ + in: labels.FromMap(map[string]string{ model.AddressLabel: "1.2.3.4", model.SchemeLabel: "http", model.MetricsPathLabel: "/custom", model.JobLabel: "custom-job", - }, + }), cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", JobName: "job", }, - res: model.LabelSet{ + res: labels.FromMap(map[string]string{ model.AddressLabel: "1.2.3.4:80", model.InstanceLabel: "1.2.3.4:80", model.SchemeLabel: "http", model.MetricsPathLabel: "/custom", model.JobLabel: "custom-job", - }, - resOrig: model.LabelSet{ + }), + resOrig: labels.FromMap(map[string]string{ model.AddressLabel: "1.2.3.4", model.SchemeLabel: "http", model.MetricsPathLabel: "/custom", model.JobLabel: "custom-job", - }, + }), }, // Provide instance label. HTTPS port default for IPv6. { - in: model.LabelSet{ + in: labels.FromMap(map[string]string{ model.AddressLabel: "[::1]", model.InstanceLabel: "custom-instance", - }, + }), cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", JobName: "job", }, - res: model.LabelSet{ + res: labels.FromMap(map[string]string{ model.AddressLabel: "[::1]:443", model.InstanceLabel: "custom-instance", model.SchemeLabel: "https", model.MetricsPathLabel: "/metrics", model.JobLabel: "job", - }, - resOrig: model.LabelSet{ + }), + resOrig: labels.FromMap(map[string]string{ model.AddressLabel: "[::1]", model.InstanceLabel: "custom-instance", model.SchemeLabel: "https", model.MetricsPathLabel: "/metrics", model.JobLabel: "job", - }, + }), }, // Apply relabeling. { - in: model.LabelSet{ + in: labels.FromMap(map[string]string{ model.AddressLabel: "1.2.3.4:1000", "custom": "value", - }, + }), cfg: &config.ScrapeConfig{ Scheme: "https", MetricsPath: "/metrics", diff --git a/rules/manager.go b/rules/manager.go index 9d211bd8c7..d24b4eec2c 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -20,7 +20,6 @@ import ( "path/filepath" "sync" "time" - "unsafe" html_template "html/template" @@ -33,7 +32,6 @@ import ( "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/strutil" ) @@ -150,10 +148,7 @@ func (g *Group) run() { iter := func() { iterationsScheduled.Inc() - if g.opts.SampleAppender.NeedsThrottling() { - iterationsSkipped.Inc() - return - } + start := time.Now() g.Eval() @@ -281,19 +276,12 @@ func (g *Group) Eval() { ) for _, s := range vector { - // TODO(fabxc): adjust after reworking appending. - var ms model.Sample - lbls := s.Metric.Map() - ms.Metric = *(*model.Metric)(unsafe.Pointer(&lbls)) - ms.Timestamp = model.Time(s.T) - ms.Value = model.SampleValue(s.V) - - if err := g.opts.SampleAppender.Append(&ms); err != nil { + if err := g.opts.SampleAppender.Add(s.Metric, s.T, s.V); err != nil { switch err { - case local.ErrOutOfOrderSample: + case storage.ErrOutOfOrderSample: numOutOfOrder++ log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") - case local.ErrDuplicateSampleForTimestamp: + case storage.ErrDuplicateSampleForTimestamp: numDuplicates++ log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") default: @@ -307,6 +295,9 @@ func (g *Group) Eval() { if numDuplicates > 0 { log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp") } + if err := g.opts.SampleAppender.Commit(); err != nil { + log.With("err", err).Warn("rule sample appending failed") + } }(rule) } wg.Wait() @@ -356,7 +347,7 @@ type ManagerOptions struct { QueryEngine *promql.Engine Context context.Context Notifier *notifier.Notifier - SampleAppender storage.SampleAppender + SampleAppender storage.Appender } // NewManager returns an implementation of Manager, ready to be started diff --git a/storage/interface.go b/storage/interface.go index 911dc7d6f4..98b47cbf84 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,7 +52,7 @@ type Querier interface { // Appender provides batched appends against a storage. type Appender interface { // Add adds a sample pair for the referenced series. - Add(lset labels.Labels, t int64, v float64) + Add(lset labels.Labels, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 1e5416e442..e49b3c6ced 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -24,7 +24,6 @@ func Open(path string) (storage.Storage, error) { } func (a adapter) Querier(mint, maxt int64) (storage.Querier, error) { - // fmt.Println("new querier at", timestamp.Time(mint), timestamp.Time(maxt), maxt-mint) return querier{q: a.db.Querier(mint, maxt)}, nil } @@ -74,9 +73,8 @@ type appender struct { a tsdb.Appender } -func (a appender) Add(lset labels.Labels, t int64, v float64) { - // fmt.Println("add", lset, timestamp.Time(t), v) - a.a.Add(toTSDBLabels(lset), t, v) +func (a appender) Add(lset labels.Labels, t int64, v float64) error { + return a.a.Add(toTSDBLabels(lset), t, v) } func (a appender) Commit() error { return a.a.Commit() } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index f011cfe65a..e9063ae420 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -19,7 +19,6 @@ import ( "fmt" "math" "net/http" - "sort" "strconv" "time" @@ -28,9 +27,10 @@ import ( "github.com/prometheus/common/route" "golang.org/x/net/context" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" - "github.com/prometheus/prometheus/storage/local" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" ) @@ -90,7 +90,7 @@ type apiFunc func(r *http.Request) (interface{}, *apiError) // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { - Storage local.Storage + Storage storage.Storage QueryEngine *promql.Engine targetRetriever targetRetriever @@ -100,7 +100,7 @@ type API struct { } // NewAPI returns an initialized API type. -func NewAPI(qe *promql.Engine, st local.Storage, tr targetRetriever) *API { +func NewAPI(qe *promql.Engine, st storage.Storage, tr targetRetriever) *API { return &API{ QueryEngine: qe, Storage: st, @@ -229,6 +229,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { } return nil, &apiError{errorExec, res.Err} } + return &queryData{ ResultType: res.Value.Type(), Result: res.Value, @@ -241,17 +242,17 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { if !model.LabelNameRE.MatchString(name) { return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} } - q, err := api.Storage.Querier() + q, err := api.Storage.Querier(math.MinInt64, math.MaxInt64) if err != nil { return nil, &apiError{errorExec, err} } defer q.Close() - vals, err := q.LabelValuesForLabelName(api.context(r), model.LabelName(name)) + // TODO(fabxc): add back request context. + vals, err := q.LabelValues(name) if err != nil { return nil, &apiError{errorExec, err} } - sort.Sort(vals) return vals, nil } @@ -284,7 +285,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { end = time.Unix(math.MaxInt64, 0) } - var matcherSets [][]*promql.LabelMatcher + var matcherSets [][]*labels.Matcher for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { @@ -347,9 +348,9 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { type Target struct { // Labels before any processing. - DiscoveredLabels model.LabelSet `json:"discoveredLabels"` + DiscoveredLabels map[string]string `json:"discoveredLabels"` // Any labels that are added to this target and its metrics. - Labels model.LabelSet `json:"labels"` + Labels map[string]string `json:"labels"` ScrapeUrl string `json:"scrapeUrl"` @@ -370,8 +371,8 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) { } res[i] = &Target{ - DiscoveredLabels: t.DiscoveredLabels(), - Labels: t.Labels(), + DiscoveredLabels: t.DiscoveredLabels().Map(), + Labels: t.Labels().Map(), ScrapeUrl: t.URL().String(), LastError: lastErrStr, LastScrape: t.LastScrape(), diff --git a/web/federate.go b/web/federate.go index 00e8454032..e08c2b4e4d 100644 --- a/web/federate.go +++ b/web/federate.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" ) @@ -37,7 +38,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { req.ParseForm() - var matcherSets [][]*promql.LabelMatcher + var matcherSets [][]*labels.Matcher for _, s := range req.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { diff --git a/web/web.go b/web/web.go index 97dd0b0a42..4b6bc3dae2 100644 --- a/web/web.go +++ b/web/web.go @@ -44,7 +44,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/rules" - "github.com/prometheus/prometheus/storage/local" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/util/httputil" api_v1 "github.com/prometheus/prometheus/web/api/v1" @@ -59,7 +59,7 @@ type Handler struct { ruleManager *rules.Manager queryEngine *promql.Engine context context.Context - storage local.Storage + storage storage.Storage notifier *notifier.Notifier apiV1 *api_v1.API @@ -104,7 +104,7 @@ type PrometheusVersion struct { // Options for the web Handler. type Options struct { Context context.Context - Storage local.Storage + Storage storage.Storage QueryEngine *promql.Engine TargetManager *retrieval.TargetManager RuleManager *rules.Manager @@ -375,7 +375,7 @@ func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { // Bucket targets by job label tps := map[string][]retrieval.Target{} for _, t := range h.targetManager.Targets() { - job := string(t.Labels()[model.JobLabel]) + job := t.Labels().Get(model.JobLabel) tps[job] = append(tps[job], t) }