diff --git a/pkg/textparse/cpu.prof b/pkg/textparse/cpu.prof deleted file mode 100644 index e64413e9d2..0000000000 Binary files a/pkg/textparse/cpu.prof and /dev/null differ diff --git a/pkg/textparse/textparse.test b/pkg/textparse/textparse.test deleted file mode 100755 index 41a5c267b3..0000000000 Binary files a/pkg/textparse/textparse.test and /dev/null differ diff --git a/retrieval/scrape.go b/retrieval/scrape.go index e41f615fb5..e4694b8a1e 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -14,22 +14,22 @@ package retrieval import ( + "bytes" "fmt" "io" "net/http" "sync" "time" - "unsafe" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/expfmt" "github.com/prometheus/common/log" - "github.com/prometheus/common/model" "golang.org/x/net/context" "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" ) @@ -322,7 +322,7 @@ func (sp *scrapePool) reportAppender(target *Target) storage.Appender { // A scraper retrieves samples and accepts a status report at the end. type scraper interface { - scrape(ctx context.Context, ts time.Time) (samples, error) + scrape(ctx context.Context, w io.Writer) error report(start time.Time, dur time.Duration, err error) offset(interval time.Duration) time.Duration } @@ -335,53 +335,41 @@ type targetScraper struct { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` -func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (samples, error) { +var scrapeBufPool = sync.Pool{} + +func getScrapeBuf() []byte { + b := scrapeBufPool.Get() + if b == nil { + return make([]byte, 0, 8192) + } + return b.([]byte) +} + +func putScrapeBuf(b []byte) { + b = b[:0] + scrapeBufPool.Put(b) +} + +func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error { req, err := http.NewRequest("GET", s.URL().String(), nil) if err != nil { - return nil, err + return err } - req.Header.Add("Accept", acceptHeader) + // Disable accept header to always negotiate for text format. + // req.Header.Add("Accept", acceptHeader) resp, err := ctxhttp.Do(ctx, s.client, req) if err != nil { - return nil, err + return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("server returned HTTP status %s", resp.Status) + return fmt.Errorf("server returned HTTP status %s", resp.Status) } - var ( - allSamples = make(samples, 0, 200) - decSamples = make(model.Vector, 0, 50) - ) - sdec := expfmt.SampleDecoder{ - Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)), - Opts: &expfmt.DecodeOptions{ - Timestamp: model.TimeFromUnixNano(ts.UnixNano()), - }, - } - - for { - if err = sdec.Decode(&decSamples); err != nil { - break - } - for _, s := range decSamples { - allSamples = append(allSamples, sample{ - metric: labels.FromMap(*(*map[string]string)(unsafe.Pointer(&s.Metric))), - t: int64(s.Timestamp), - v: float64(s.Value), - }) - } - decSamples = decSamples[:0] - } - - if err == io.EOF { - // Set err to nil since it is used in the scrape health recording. - err = nil - } - return allSamples, err + _, err = io.Copy(w, resp.Body) + return err } // A loop can run and be stopped again. It must not be reused after it was stopped. @@ -396,6 +384,8 @@ type scrapeLoop struct { appender func() storage.Appender reportAppender func() storage.Appender + cache map[string]uint64 + done chan struct{} ctx context.Context cancel func() @@ -406,6 +396,7 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag scraper: sc, appender: app, reportAppender: reportApp, + cache: map[string]uint64{}, done: make(chan struct{}), } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -447,14 +438,22 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { ) } - samples, err := sl.scraper.scrape(scrapeCtx, start) + n := 0 + buf := bytes.NewBuffer(getScrapeBuf()) + + err := sl.scraper.scrape(scrapeCtx, buf) if err == nil { - sl.append(samples) + b := buf.Bytes() + + if n, err = sl.append(b, start); err != nil { + log.With("err", err).Error("append failed") + } + putScrapeBuf(b) } else if errc != nil { errc <- err } - sl.report(start, time.Since(start), len(samples), err) + sl.report(start, time.Since(start), n, err) last = start select { @@ -491,48 +490,59 @@ func (s samples) Less(i, j int) bool { return s[i].t < s[j].t } -func (sl *scrapeLoop) append(samples samples) { +func (sl *scrapeLoop) append(b []byte, ts time.Time) (n int, err error) { var ( - numOutOfOrder = 0 - numDuplicates = 0 + app = sl.appender() + p = textparse.New(b) + defTime = timestamp.FromTime(ts) ) - app := sl.appender() - for _, s := range samples { - ref, err := app.SetSeries(s.metric) - if err != nil { - log.With("sample", s).With("error", err).Debug("Setting metric failed") - continue + for p.Next() { + t := defTime + met, tp, v := p.At() + if tp != nil { + t = *tp } - if err := app.Add(ref, s.t, s.v); err != nil { - switch err { - case storage.ErrOutOfOrderSample: - numOutOfOrder++ - log.With("sample", s).With("error", err).Debug("Sample discarded") - case storage.ErrDuplicateSampleForTimestamp: - numDuplicates++ - log.With("sample", s).With("error", err).Debug("Sample discarded") - default: - log.With("sample", s).With("error", err).Warn("Sample discarded") + + mets := string(met) + ref, ok := sl.cache[mets] + if ok { + if err = app.Add(ref, t, v); err != storage.ErrNotFound { + break + } + ok = false + } + if !ok { + var lset labels.Labels + p.Metric(&lset) + ref, err = app.SetSeries(lset) + if err != nil { + break + } + if err = app.Add(ref, t, v); err != nil { + break } } + sl.cache[mets] = ref + n++ } - if numOutOfOrder > 0 { - log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") + if err == nil { + err = p.Err() } - if numDuplicates > 0 { - log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") + if err != nil { + app.Rollback() + return 0, err } - if err := app.Commit(); err != nil { - log.With("err", err).Warn("Error commiting scrape") + return 0, err } + return n, nil } -func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) { +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) error { sl.scraper.report(start, duration, err) - ts := int64(model.TimeFromUnixNano(start.UnixNano())) + ts := timestamp.FromTime(start) var health float64 if err == nil { @@ -541,41 +551,40 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam app := sl.reportAppender() - var ( - healthMet = labels.Labels{ - labels.Label{Name: labels.MetricName, Value: scrapeHealthMetricName}, - } - durationMet = labels.Labels{ - labels.Label{Name: labels.MetricName, Value: scrapeDurationMetricName}, - } - countMet = labels.Labels{ - labels.Label{Name: labels.MetricName, Value: scrapeSamplesMetricName}, - } - ) - - ref, err := app.SetSeries(healthMet) - if err != nil { - panic(err) + if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { + app.Rollback() + return err } - if err := app.Add(ref, ts, health); err != nil { - log.With("err", err).Warn("Scrape health sample discarded") + if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { + app.Rollback() + return err } - ref, err = app.SetSeries(durationMet) - if err != nil { - panic(err) - } - if err := app.Add(ref, ts, duration.Seconds()); err != nil { - log.With("err", err).Warn("Scrape duration sample discarded") - } - ref, err = app.SetSeries(countMet) - if err != nil { - panic(err) - } - if err := app.Add(ref, ts, float64(scrapedSamples)); err != nil { - log.With("err", err).Warn("Scrape sample count sample discarded") - } - - if err := app.Commit(); err != nil { - log.With("err", err).Warn("Commiting report samples failed") + if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scrapedSamples)); err != nil { + app.Rollback() + return err } + return app.Commit() +} + +func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { + ref, ok := sl.cache[s] + + if ok { + if err := app.Add(ref, t, v); err != storage.ErrNotFound { + return err + } + } + met := labels.Labels{ + labels.Label{Name: labels.MetricName, Value: s}, + } + ref, err := app.SetSeries(met) + if err != nil { + return err + } + if err = app.Add(ref, t, v); err != nil { + return err + } + sl.cache[s] = ref + + return nil } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 1ff14254b6..a4f33b797a 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -14,23 +14,25 @@ package retrieval import ( + "bytes" "fmt" + "io" + "io/ioutil" "net/http" "net/http/httptest" "net/url" "reflect" - "sort" "strings" "sync" "testing" "time" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" ) @@ -321,9 +323,9 @@ func TestScrapeLoopStop(t *testing.T) { } // Running the scrape loop must exit before calling the scraper even once. - scraper.scrapeFunc = func(context.Context, time.Time) (samples, error) { + scraper.scrapeFunc = func(context.Context, io.Writer) error { t.Fatalf("scraper was called for terminated scrape loop") - return nil, nil + return nil } runDone := make(chan struct{}) @@ -385,13 +387,13 @@ func TestScrapeLoopRun(t *testing.T) { scraper.offsetDur = 0 block := make(chan struct{}) - scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (samples, error) { + scraper.scrapeFunc = func(ctx context.Context, _ io.Writer) error { select { case <-block: case <-ctx.Done(): - return nil, ctx.Err() + return ctx.Err() } - return nil, nil + return nil } ctx, cancel = context.WithCancel(context.Background()) @@ -450,33 +452,12 @@ func TestTargetScraperScrapeOK(t *testing.T) { }, client: http.DefaultClient, } - now := time.Now() + var buf bytes.Buffer - smpls, err := ts.scrape(context.Background(), now) - if err != nil { + if err := ts.scrape(context.Background(), &buf); err != nil { t.Fatalf("Unexpected scrape error: %s", err) } - - expectedSamples := samples{ - sample{ - metric: labels.FromStrings(labels.MetricName, "metric_a"), - t: timestamp.FromTime(now), - v: 1, - }, - sample{ - metric: labels.FromStrings(labels.MetricName, "metric_b"), - t: timestamp.FromTime(now), - v: 2, - }, - } - sort.Sort(expectedSamples) - sort.Sort(smpls) - - if !reflect.DeepEqual(smpls, expectedSamples) { - t.Errorf("Scraped samples did not match served metrics") - t.Errorf("Expected: %v", expectedSamples) - t.Fatalf("Got: %v", smpls) - } + require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String()) } func TestTargetScrapeScrapeCancel(t *testing.T) { @@ -513,7 +494,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { }() go func() { - if _, err := ts.scrape(ctx, time.Now()); err != context.Canceled { + if err := ts.scrape(ctx, ioutil.Discard); err != context.Canceled { errc <- fmt.Errorf("Expected context cancelation error but got: %s", err) } close(errc) @@ -555,7 +536,7 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) { client: http.DefaultClient, } - if _, err := ts.scrape(context.Background(), time.Now()); !strings.Contains(err.Error(), "404") { + if err := ts.scrape(context.Background(), ioutil.Discard); !strings.Contains(err.Error(), "404") { t.Fatalf("Expected \"404 NotFound\" error but got: %s", err) } } @@ -571,7 +552,7 @@ type testScraper struct { samples samples scrapeErr error - scrapeFunc func(context.Context, time.Time) (samples, error) + scrapeFunc func(context.Context, io.Writer) error } func (ts *testScraper) offset(interval time.Duration) time.Duration { @@ -584,9 +565,9 @@ func (ts *testScraper) report(start time.Time, duration time.Duration, err error ts.lastError = err } -func (ts *testScraper) scrape(ctx context.Context, t time.Time) (samples, error) { +func (ts *testScraper) scrape(ctx context.Context, w io.Writer) error { if ts.scrapeFunc != nil { - return ts.scrapeFunc(ctx, t) + return ts.scrapeFunc(ctx, w) } - return ts.samples, ts.scrapeErr + return ts.scrapeErr } diff --git a/storage/interface.go b/storage/interface.go index e35b00f92e..7c54231ccc 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -20,6 +20,7 @@ import ( ) var ( + ErrNotFound = errors.New("not found") ErrOutOfOrderSample = errors.New("out of order sample") ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp") ) diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index f24d5ac31c..01098c67cd 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -78,7 +78,17 @@ func (a appender) SetSeries(lset labels.Labels) (uint64, error) { } func (a appender) Add(ref uint64, t int64, v float64) error { - return a.a.Add(ref, t, v) + err := a.a.Add(ref, t, v) + + switch err { + case tsdb.ErrNotFound: + return storage.ErrNotFound + case tsdb.ErrOutOfOrderSample: + return storage.ErrOutOfOrderSample + case tsdb.ErrAmendSample: + return storage.ErrDuplicateSampleForTimestamp + } + return err } func (a appender) Commit() error { return a.a.Commit() }