// Copyright 2016 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package retrieval import ( "errors" "fmt" "io" "net/http" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" ) const ( scrapeHealthMetricName = "up" scrapeDurationMetricName = "scrape_duration_seconds" // Capacity of the channel to buffer samples during ingestion. ingestedSamplesCap = 256 // Constants for instrumentation. namespace = "prometheus" interval = "interval" scrapeJob = "scrape_job" ) var ( errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: namespace, Name: "target_interval_length_seconds", Help: "Actual intervals between scrapes.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }, []string{interval}, ) targetSkippedScrapes = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Name: "target_skipped_scrapes_total", Help: "Total number of scrapes that were skipped because the metric storage was throttled.", }, []string{interval}, ) targetReloadIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: namespace, Name: "target_reload_length_seconds", Help: "Actual interval to reload the scrape pool with a given configuration.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }, []string{interval}, ) targetSyncIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: namespace, Name: "target_sync_length_seconds", Help: "Actual interval to sync the scrape pool.", Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, }, []string{scrapeJob}, ) targetScrapePoolSyncsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Name: "target_scrape_pool_sync_total", Help: "Total number of syncs that were executed on a scrape pool.", }, []string{scrapeJob}, ) ) func init() { prometheus.MustRegister(targetIntervalLength) prometheus.MustRegister(targetSkippedScrapes) prometheus.MustRegister(targetReloadIntervalLength) prometheus.MustRegister(targetSyncIntervalLength) prometheus.MustRegister(targetScrapePoolSyncsCounter) } // scrapePool manages scrapes for sets of targets. type scrapePool struct { appender storage.SampleAppender ctx context.Context mtx sync.RWMutex config *config.ScrapeConfig client *http.Client // Targets and loops must always be synchronized to have the same // set of hashes. targets map[uint64]*Target loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop } func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { client, err := newHTTPClient(cfg) if err != nil { // Any errors that could occur here should be caught during config validation. log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) } return &scrapePool{ appender: app, config: cfg, client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, newLoop: newScrapeLoop, } } // stop terminates all scrape loops and returns after they all terminated. func (sp *scrapePool) stop() { var wg sync.WaitGroup sp.mtx.Lock() defer sp.mtx.Unlock() for fp, l := range sp.loops { wg.Add(1) go func(l loop) { l.stop() wg.Done() }(l) delete(sp.loops, fp) delete(sp.targets, fp) } wg.Wait() } // reload the scrape pool with the given scrape configuration. The target state is preserved // but all scrape loops are restarted with the new scrape configuration. // This method returns after all scrape loops that were stopped have fully terminated. func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { start := time.Now() sp.mtx.Lock() defer sp.mtx.Unlock() client, err := newHTTPClient(cfg) if err != nil { // Any errors that could occur here should be caught during config validation. log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) } sp.config = cfg sp.client = client var ( wg sync.WaitGroup interval = time.Duration(sp.config.ScrapeInterval) timeout = time.Duration(sp.config.ScrapeTimeout) ) for fp, oldLoop := range sp.loops { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client} newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) ) wg.Add(1) go func(oldLoop, newLoop loop) { oldLoop.stop() wg.Done() go newLoop.run(interval, timeout, nil) }(oldLoop, newLoop) sp.loops[fp] = newLoop } wg.Wait() targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( float64(time.Since(start)) / float64(time.Second), ) } // sync takes a list of potentially duplicated targets, deduplicates them, starts // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { start := time.Now() sp.mtx.Lock() defer sp.mtx.Unlock() var ( uniqueTargets = map[uint64]struct{}{} interval = time.Duration(sp.config.ScrapeInterval) timeout = time.Duration(sp.config.ScrapeTimeout) ) for _, t := range targets { hash := t.hash() uniqueTargets[hash] = struct{}{} if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client} l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) sp.targets[hash] = t sp.loops[hash] = l go l.run(interval, timeout, nil) } } var wg sync.WaitGroup // Stop and remove old targets and scraper loops. for hash := range sp.targets { if _, ok := uniqueTargets[hash]; !ok { wg.Add(1) go func(l loop) { l.stop() wg.Done() }(sp.loops[hash]) delete(sp.loops, hash) delete(sp.targets, hash) } } // Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. wg.Wait() targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( float64(time.Since(start)) / float64(time.Second), ) targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() } // sampleAppender returns an appender for ingested samples from the target. func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { app := sp.appender // The relabelAppender has to be inside the label-modifying appenders // so the relabeling rules are applied to the correct label set. if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { app = relabelAppender{ SampleAppender: app, relabelings: mrc, } } if sp.config.HonorLabels { app = honorLabelsAppender{ SampleAppender: app, labels: target.Labels(), } } else { app = ruleLabelsAppender{ SampleAppender: app, labels: target.Labels(), } } return app } // reportAppender returns an appender for reporting samples for the target. func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { return ruleLabelsAppender{ SampleAppender: sp.appender, labels: target.Labels(), } } // A scraper retrieves samples and accepts a status report at the end. type scraper interface { scrape(ctx context.Context, ts time.Time) (model.Samples, error) report(start time.Time, dur time.Duration, err error) offset(interval time.Duration) time.Duration } // targetScraper implements the scraper interface for a target. type targetScraper struct { *Target client *http.Client } const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) { req, err := http.NewRequest("GET", s.URL().String(), nil) if err != nil { return nil, err } req.Header.Add("Accept", acceptHeader) resp, err := ctxhttp.Do(ctx, s.client, req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("server returned HTTP status %s", resp.Status) } var ( allSamples = make(model.Samples, 0, 200) decSamples = make(model.Vector, 0, 50) ) sdec := expfmt.SampleDecoder{ Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)), Opts: &expfmt.DecodeOptions{ Timestamp: model.TimeFromUnixNano(ts.UnixNano()), }, } for { if err = sdec.Decode(&decSamples); err != nil { break } allSamples = append(allSamples, decSamples...) decSamples = decSamples[:0] } if err == io.EOF { // Set err to nil since it is used in the scrape health recording. err = nil } return allSamples, err } // A loop can run and be stopped again. It must not be reused after it was stopped. type loop interface { run(interval, timeout time.Duration, errc chan<- error) stop() } type scrapeLoop struct { scraper scraper appender storage.SampleAppender reportAppender storage.SampleAppender done chan struct{} ctx context.Context cancel func() } func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { sl := &scrapeLoop{ scraper: sc, appender: app, reportAppender: reportApp, done: make(chan struct{}), } sl.ctx, sl.cancel = context.WithCancel(ctx) return sl } func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { defer close(sl.done) select { case <-time.After(sl.scraper.offset(interval)): // Continue after a scraping offset. case <-sl.ctx.Done(): return } var last time.Time ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-sl.ctx.Done(): return default: } if !sl.appender.NeedsThrottling() { var ( start = time.Now() scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) ) // Only record after the first scrape. if !last.IsZero() { targetIntervalLength.WithLabelValues(interval.String()).Observe( float64(time.Since(last)) / float64(time.Second), // Sub-second precision. ) } samples, err := sl.scraper.scrape(scrapeCtx, start) if err == nil { sl.append(samples) } else if errc != nil { errc <- err } sl.report(start, time.Since(start), err) last = start } else { targetSkippedScrapes.WithLabelValues(interval.String()).Inc() } select { case <-sl.ctx.Done(): return case <-ticker.C: } } } func (sl *scrapeLoop) stop() { sl.cancel() <-sl.done } func (sl *scrapeLoop) append(samples model.Samples) { numOutOfOrder := 0 for _, s := range samples { if err := sl.appender.Append(s); err != nil { if err == local.ErrOutOfOrderSample { numOutOfOrder++ } else { log.Warnf("Error inserting sample: %s", err) } } } if numOutOfOrder > 0 { log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") } } func (sl *scrapeLoop) report(start time.Time, duration time.Duration, err error) { sl.scraper.report(start, duration, err) ts := model.TimeFromUnixNano(start.UnixNano()) var health model.SampleValue if err == nil { health = 1 } healthSample := &model.Sample{ Metric: model.Metric{ model.MetricNameLabel: scrapeHealthMetricName, }, Timestamp: ts, Value: health, } durationSample := &model.Sample{ Metric: model.Metric{ model.MetricNameLabel: scrapeDurationMetricName, }, Timestamp: ts, Value: model.SampleValue(float64(duration) / float64(time.Second)), } sl.reportAppender.Append(healthSample) sl.reportAppender.Append(durationSample) }