mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 22:19:40 -08:00
47a673c3a0
The scrape manage receiver's channel now just saves the target sets and another backgorund runner updates the scrape loops every 5 seconds. This is so that the scrape manager doesn't block the receiving channel when it does the long background reloading of the scrape loops. Active and dropped targets are now saved in each scrape pool instead of the scrape manager. This is mainly to avoid races when getting the targets via the web api. When reloading the scrape loops now happens in parallel to speed up the final disared state and this also speeds up the prometheus's shutting down. Also updated some funcs signatures in the web package for consistency. Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
1203 lines
31 KiB
Go
1203 lines
31 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scrape
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
config_util "github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/version"
|
|
"golang.org/x/net/context/ctxhttp"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/pkg/pool"
|
|
"github.com/prometheus/prometheus/pkg/relabel"
|
|
"github.com/prometheus/prometheus/pkg/textparse"
|
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
|
"github.com/prometheus/prometheus/pkg/value"
|
|
"github.com/prometheus/prometheus/storage"
|
|
)
|
|
|
|
var (
|
|
targetIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_interval_length_seconds",
|
|
Help: "Actual intervals between scrapes.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"interval"},
|
|
)
|
|
targetReloadIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_reload_length_seconds",
|
|
Help: "Actual interval to reload the scrape pool with a given configuration.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"interval"},
|
|
)
|
|
targetSyncIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_sync_length_seconds",
|
|
Help: "Actual interval to sync the scrape pool.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"scrape_job"},
|
|
)
|
|
targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrape_pool_sync_total",
|
|
Help: "Total number of syncs that were executed on a scrape pool.",
|
|
},
|
|
[]string{"scrape_job"},
|
|
)
|
|
targetScrapeSampleLimit = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
|
|
Help: "Total number of scrapes that hit the sample limit and were rejected.",
|
|
},
|
|
)
|
|
targetScrapeSampleDuplicate = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_sample_duplicate_timestamp_total",
|
|
Help: "Total number of samples rejected due to duplicate timestamps but different values",
|
|
},
|
|
)
|
|
targetScrapeSampleOutOfOrder = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_sample_out_of_order_total",
|
|
Help: "Total number of samples rejected due to not being out of the expected order",
|
|
},
|
|
)
|
|
targetScrapeSampleOutOfBounds = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_sample_out_of_bounds_total",
|
|
Help: "Total number of samples rejected due to timestamp falling outside of the time bounds",
|
|
},
|
|
)
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(targetIntervalLength)
|
|
prometheus.MustRegister(targetReloadIntervalLength)
|
|
prometheus.MustRegister(targetSyncIntervalLength)
|
|
prometheus.MustRegister(targetScrapePoolSyncsCounter)
|
|
prometheus.MustRegister(targetScrapeSampleLimit)
|
|
prometheus.MustRegister(targetScrapeSampleDuplicate)
|
|
prometheus.MustRegister(targetScrapeSampleOutOfOrder)
|
|
prometheus.MustRegister(targetScrapeSampleOutOfBounds)
|
|
}
|
|
|
|
// scrapePool manages scrapes for sets of targets.
|
|
type scrapePool struct {
|
|
appendable Appendable
|
|
logger log.Logger
|
|
|
|
mtx sync.RWMutex
|
|
config *config.ScrapeConfig
|
|
client *http.Client
|
|
// Targets and loops must always be synchronized to have the same
|
|
// set of hashes.
|
|
activeTargets map[uint64]*Target
|
|
droppedTargets []*Target
|
|
loops map[uint64]loop
|
|
cancel context.CancelFunc
|
|
|
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
|
newLoop func(*Target, scraper, int, bool, []*config.RelabelConfig) loop
|
|
}
|
|
|
|
const maxAheadTime = 10 * time.Minute
|
|
|
|
type labelsMutator func(labels.Labels) labels.Labels
|
|
|
|
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
|
|
if logger == nil {
|
|
logger = log.NewNopLogger()
|
|
}
|
|
|
|
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
|
|
if err != nil {
|
|
// Any errors that could occur here should be caught during config validation.
|
|
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
|
|
}
|
|
|
|
buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sp := &scrapePool{
|
|
cancel: cancel,
|
|
appendable: app,
|
|
config: cfg,
|
|
client: client,
|
|
activeTargets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
logger: logger,
|
|
}
|
|
sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*config.RelabelConfig) loop {
|
|
// Update the targets retrieval function for metadata to a new scrape cache.
|
|
cache := newScrapeCache()
|
|
t.setMetadataStore(cache)
|
|
|
|
return newScrapeLoop(
|
|
ctx,
|
|
s,
|
|
log.With(logger, "target", t),
|
|
buffers,
|
|
func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, t, honor, mrc) },
|
|
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, t) },
|
|
func() storage.Appender {
|
|
app, err := app.Appender()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return appender(app, limit)
|
|
},
|
|
cache,
|
|
)
|
|
}
|
|
|
|
return sp
|
|
}
|
|
|
|
func (sp *scrapePool) ActiveTargets() []*Target {
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
var tActive []*Target
|
|
for _, t := range sp.activeTargets {
|
|
tActive = append(tActive, t)
|
|
}
|
|
return tActive
|
|
}
|
|
|
|
func (sp *scrapePool) DroppedTargets() []*Target {
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
return sp.droppedTargets
|
|
}
|
|
|
|
// stop terminates all scrape loops and returns after they all terminated.
|
|
func (sp *scrapePool) stop() {
|
|
sp.cancel()
|
|
var wg sync.WaitGroup
|
|
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
for fp, l := range sp.loops {
|
|
wg.Add(1)
|
|
|
|
go func(l loop) {
|
|
l.stop()
|
|
wg.Done()
|
|
}(l)
|
|
|
|
delete(sp.loops, fp)
|
|
delete(sp.activeTargets, fp)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// reload the scrape pool with the given scrape configuration. The target state is preserved
|
|
// but all scrape loops are restarted with the new scrape configuration.
|
|
// This method returns after all scrape loops that were stopped have stopped scraping.
|
|
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|
start := time.Now()
|
|
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
|
|
if err != nil {
|
|
// Any errors that could occur here should be caught during config validation.
|
|
level.Error(sp.logger).Log("msg", "Error creating HTTP client", "err", err)
|
|
}
|
|
sp.config = cfg
|
|
sp.client = client
|
|
|
|
var (
|
|
wg sync.WaitGroup
|
|
interval = time.Duration(sp.config.ScrapeInterval)
|
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
|
limit = int(sp.config.SampleLimit)
|
|
honor = sp.config.HonorLabels
|
|
mrc = sp.config.MetricRelabelConfigs
|
|
)
|
|
|
|
for fp, oldLoop := range sp.loops {
|
|
var (
|
|
t = sp.activeTargets[fp]
|
|
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
|
|
newLoop = sp.newLoop(t, s, limit, honor, mrc)
|
|
)
|
|
wg.Add(1)
|
|
|
|
go func(oldLoop, newLoop loop) {
|
|
oldLoop.stop()
|
|
wg.Done()
|
|
|
|
go newLoop.run(interval, timeout, nil)
|
|
}(oldLoop, newLoop)
|
|
|
|
sp.loops[fp] = newLoop
|
|
}
|
|
|
|
wg.Wait()
|
|
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
time.Since(start).Seconds(),
|
|
)
|
|
}
|
|
|
|
// Sync converts target groups into actual scrape targets and synchronizes
|
|
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
|
|
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
|
start := time.Now()
|
|
|
|
var all []*Target
|
|
sp.mtx.Lock()
|
|
sp.droppedTargets = []*Target{}
|
|
for _, tg := range tgs {
|
|
targets, err := targetsFromGroup(tg, sp.config)
|
|
if err != nil {
|
|
level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
|
|
continue
|
|
}
|
|
for _, t := range targets {
|
|
if t.Labels().Len() > 0 {
|
|
all = append(all, t)
|
|
} else if t.DiscoveredLabels().Len() > 0 {
|
|
sp.droppedTargets = append(sp.droppedTargets, t)
|
|
}
|
|
}
|
|
}
|
|
sp.mtx.Unlock()
|
|
sp.sync(all)
|
|
|
|
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
|
time.Since(start).Seconds(),
|
|
)
|
|
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
|
|
}
|
|
|
|
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
|
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
|
// It returns after all stopped scrape loops terminated.
|
|
func (sp *scrapePool) sync(targets []*Target) {
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
var (
|
|
uniqueTargets = map[uint64]struct{}{}
|
|
interval = time.Duration(sp.config.ScrapeInterval)
|
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
|
limit = int(sp.config.SampleLimit)
|
|
honor = sp.config.HonorLabels
|
|
mrc = sp.config.MetricRelabelConfigs
|
|
)
|
|
|
|
for _, t := range targets {
|
|
t := t
|
|
hash := t.hash()
|
|
uniqueTargets[hash] = struct{}{}
|
|
|
|
if _, ok := sp.activeTargets[hash]; !ok {
|
|
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
|
|
l := sp.newLoop(t, s, limit, honor, mrc)
|
|
|
|
sp.activeTargets[hash] = t
|
|
sp.loops[hash] = l
|
|
|
|
go l.run(interval, timeout, nil)
|
|
} else {
|
|
// Need to keep the most updated labels information
|
|
// for displaying it in the Service Discovery web page.
|
|
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
|
|
}
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
// Stop and remove old targets and scraper loops.
|
|
for hash := range sp.activeTargets {
|
|
if _, ok := uniqueTargets[hash]; !ok {
|
|
wg.Add(1)
|
|
go func(l loop) {
|
|
|
|
l.stop()
|
|
|
|
wg.Done()
|
|
}(sp.loops[hash])
|
|
|
|
delete(sp.loops, hash)
|
|
delete(sp.activeTargets, hash)
|
|
}
|
|
}
|
|
|
|
// Wait for all potentially stopped scrapers to terminate.
|
|
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
|
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
|
// be inserting a previous sample set.
|
|
wg.Wait()
|
|
}
|
|
|
|
func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*config.RelabelConfig) labels.Labels {
|
|
lb := labels.NewBuilder(lset)
|
|
|
|
if honor {
|
|
for _, l := range target.Labels() {
|
|
if !lset.Has(l.Name) {
|
|
lb.Set(l.Name, l.Value)
|
|
}
|
|
}
|
|
} else {
|
|
for _, l := range target.Labels() {
|
|
lv := lset.Get(l.Name)
|
|
if lv != "" {
|
|
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
|
|
}
|
|
lb.Set(l.Name, l.Value)
|
|
}
|
|
}
|
|
|
|
for _, l := range lb.Labels() {
|
|
if l.Value == "" {
|
|
lb.Del(l.Name)
|
|
}
|
|
}
|
|
|
|
res := lb.Labels()
|
|
|
|
if len(rc) > 0 {
|
|
res = relabel.Process(res, rc...)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
|
|
lb := labels.NewBuilder(lset)
|
|
|
|
for _, l := range target.Labels() {
|
|
lv := lset.Get(l.Name)
|
|
if lv != "" {
|
|
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
|
|
}
|
|
lb.Set(l.Name, l.Value)
|
|
}
|
|
|
|
return lb.Labels()
|
|
}
|
|
|
|
// appender returns an appender for ingested samples from the target.
|
|
func appender(app storage.Appender, limit int) storage.Appender {
|
|
app = &timeLimitAppender{
|
|
Appender: app,
|
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
|
}
|
|
|
|
// The limit is applied after metrics are potentially dropped via relabeling.
|
|
if limit > 0 {
|
|
app = &limitAppender{
|
|
Appender: app,
|
|
limit: limit,
|
|
}
|
|
}
|
|
return app
|
|
}
|
|
|
|
// A scraper retrieves samples and accepts a status report at the end.
|
|
type scraper interface {
|
|
scrape(ctx context.Context, w io.Writer) error
|
|
report(start time.Time, dur time.Duration, err error)
|
|
offset(interval time.Duration) time.Duration
|
|
}
|
|
|
|
// targetScraper implements the scraper interface for a target.
|
|
type targetScraper struct {
|
|
*Target
|
|
|
|
client *http.Client
|
|
req *http.Request
|
|
timeout time.Duration
|
|
|
|
gzipr *gzip.Reader
|
|
buf *bufio.Reader
|
|
}
|
|
|
|
const acceptHeader = `text/plain;version=0.0.4;q=1,*/*;q=0.1`
|
|
|
|
var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
|
|
|
|
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error {
|
|
if s.req == nil {
|
|
req, err := http.NewRequest("GET", s.URL().String(), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Add("Accept", acceptHeader)
|
|
req.Header.Add("Accept-Encoding", "gzip")
|
|
req.Header.Set("User-Agent", userAgentHeader)
|
|
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))
|
|
|
|
s.req = req
|
|
}
|
|
|
|
resp, err := ctxhttp.Do(ctx, s.client, s.req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("server returned HTTP status %s", resp.Status)
|
|
}
|
|
|
|
if resp.Header.Get("Content-Encoding") != "gzip" {
|
|
_, err = io.Copy(w, resp.Body)
|
|
return err
|
|
}
|
|
|
|
if s.gzipr == nil {
|
|
s.buf = bufio.NewReader(resp.Body)
|
|
s.gzipr, err = gzip.NewReader(s.buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
s.buf.Reset(resp.Body)
|
|
if err = s.gzipr.Reset(s.buf); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
_, err = io.Copy(w, s.gzipr)
|
|
s.gzipr.Close()
|
|
return err
|
|
}
|
|
|
|
// A loop can run and be stopped again. It must not be reused after it was stopped.
|
|
type loop interface {
|
|
run(interval, timeout time.Duration, errc chan<- error)
|
|
stop()
|
|
}
|
|
|
|
type cacheEntry struct {
|
|
ref uint64
|
|
lastIter uint64
|
|
hash uint64
|
|
lset labels.Labels
|
|
}
|
|
|
|
type scrapeLoop struct {
|
|
scraper scraper
|
|
l log.Logger
|
|
cache *scrapeCache
|
|
lastScrapeSize int
|
|
buffers *pool.Pool
|
|
|
|
appender func() storage.Appender
|
|
sampleMutator labelsMutator
|
|
reportSampleMutator labelsMutator
|
|
|
|
ctx context.Context
|
|
scrapeCtx context.Context
|
|
cancel func()
|
|
stopped chan struct{}
|
|
}
|
|
|
|
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
|
// storage references. Additionally, it tracks staleness of series between
|
|
// scrapes.
|
|
type scrapeCache struct {
|
|
iter uint64 // Current scrape iteration.
|
|
|
|
// Parsed string to an entry with information about the actual label set
|
|
// and its storage reference.
|
|
series map[string]*cacheEntry
|
|
|
|
// Cache of dropped metric strings and their iteration. The iteration must
|
|
// be a pointer so we can update it without setting a new entry with an unsafe
|
|
// string in addDropped().
|
|
droppedSeries map[string]*uint64
|
|
|
|
// seriesCur and seriesPrev store the labels of series that were seen
|
|
// in the current and previous scrape.
|
|
// We hold two maps and swap them out to save allocations.
|
|
seriesCur map[uint64]labels.Labels
|
|
seriesPrev map[uint64]labels.Labels
|
|
|
|
metaMtx sync.Mutex
|
|
metadata map[string]*metaEntry
|
|
}
|
|
|
|
// metaEntry holds meta information about a metric.
|
|
type metaEntry struct {
|
|
lastIter uint64 // Last scrape iteration the entry was observed at.
|
|
typ textparse.MetricType
|
|
help string
|
|
}
|
|
|
|
func newScrapeCache() *scrapeCache {
|
|
return &scrapeCache{
|
|
series: map[string]*cacheEntry{},
|
|
droppedSeries: map[string]*uint64{},
|
|
seriesCur: map[uint64]labels.Labels{},
|
|
seriesPrev: map[uint64]labels.Labels{},
|
|
metadata: map[string]*metaEntry{},
|
|
}
|
|
}
|
|
|
|
func (c *scrapeCache) iterDone() {
|
|
// All caches may grow over time through series churn
|
|
// or multiple string representations of the same metric. Clean up entries
|
|
// that haven't appeared in the last scrape.
|
|
for s, e := range c.series {
|
|
if c.iter-e.lastIter > 2 {
|
|
delete(c.series, s)
|
|
}
|
|
}
|
|
for s, iter := range c.droppedSeries {
|
|
if c.iter-*iter > 2 {
|
|
delete(c.droppedSeries, s)
|
|
}
|
|
}
|
|
c.metaMtx.Lock()
|
|
for m, e := range c.metadata {
|
|
// Keep metadata around for 10 scrapes after its metric disappeared.
|
|
if c.iter-e.lastIter > 10 {
|
|
delete(c.metadata, m)
|
|
}
|
|
}
|
|
c.metaMtx.Unlock()
|
|
|
|
// Swap current and previous series.
|
|
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
|
|
|
|
// We have to delete every single key in the map.
|
|
for k := range c.seriesCur {
|
|
delete(c.seriesCur, k)
|
|
}
|
|
|
|
c.iter++
|
|
}
|
|
|
|
func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
|
|
e, ok := c.series[met]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
e.lastIter = c.iter
|
|
return e, true
|
|
}
|
|
|
|
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
|
|
if ref == 0 {
|
|
return
|
|
}
|
|
c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
|
|
}
|
|
|
|
func (c *scrapeCache) addDropped(met string) {
|
|
iter := c.iter
|
|
c.droppedSeries[met] = &iter
|
|
}
|
|
|
|
func (c *scrapeCache) getDropped(met string) bool {
|
|
iterp, ok := c.droppedSeries[met]
|
|
if ok {
|
|
*iterp = c.iter
|
|
}
|
|
return ok
|
|
}
|
|
|
|
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
|
|
c.seriesCur[hash] = lset
|
|
}
|
|
|
|
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
|
|
for h, lset := range c.seriesPrev {
|
|
if _, ok := c.seriesCur[h]; !ok {
|
|
if !f(lset) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *scrapeCache) setType(metric []byte, t textparse.MetricType) {
|
|
c.metaMtx.Lock()
|
|
|
|
e, ok := c.metadata[yoloString(metric)]
|
|
if !ok {
|
|
e = &metaEntry{typ: textparse.MetricTypeUntyped}
|
|
c.metadata[string(metric)] = e
|
|
}
|
|
e.typ = t
|
|
e.lastIter = c.iter
|
|
|
|
c.metaMtx.Unlock()
|
|
}
|
|
|
|
func (c *scrapeCache) setHelp(metric, help []byte) {
|
|
c.metaMtx.Lock()
|
|
|
|
e, ok := c.metadata[yoloString(metric)]
|
|
if !ok {
|
|
e = &metaEntry{typ: textparse.MetricTypeUntyped}
|
|
c.metadata[string(metric)] = e
|
|
}
|
|
if e.help != yoloString(help) {
|
|
e.help = string(help)
|
|
}
|
|
e.lastIter = c.iter
|
|
|
|
c.metaMtx.Unlock()
|
|
}
|
|
|
|
func (c *scrapeCache) getMetadata(metric string) (MetricMetadata, bool) {
|
|
c.metaMtx.Lock()
|
|
defer c.metaMtx.Unlock()
|
|
|
|
m, ok := c.metadata[metric]
|
|
if !ok {
|
|
return MetricMetadata{}, false
|
|
}
|
|
return MetricMetadata{
|
|
Metric: metric,
|
|
Type: m.typ,
|
|
Help: m.help,
|
|
}, true
|
|
}
|
|
|
|
func (c *scrapeCache) listMetadata() []MetricMetadata {
|
|
c.metaMtx.Lock()
|
|
defer c.metaMtx.Unlock()
|
|
|
|
res := make([]MetricMetadata, 0, len(c.metadata))
|
|
|
|
for m, e := range c.metadata {
|
|
res = append(res, MetricMetadata{
|
|
Metric: m,
|
|
Type: e.typ,
|
|
Help: e.help,
|
|
})
|
|
}
|
|
return res
|
|
}
|
|
|
|
func newScrapeLoop(ctx context.Context,
|
|
sc scraper,
|
|
l log.Logger,
|
|
buffers *pool.Pool,
|
|
sampleMutator labelsMutator,
|
|
reportSampleMutator labelsMutator,
|
|
appender func() storage.Appender,
|
|
cache *scrapeCache,
|
|
) *scrapeLoop {
|
|
if l == nil {
|
|
l = log.NewNopLogger()
|
|
}
|
|
if buffers == nil {
|
|
buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
|
}
|
|
if cache == nil {
|
|
cache = newScrapeCache()
|
|
}
|
|
sl := &scrapeLoop{
|
|
scraper: sc,
|
|
buffers: buffers,
|
|
cache: cache,
|
|
appender: appender,
|
|
sampleMutator: sampleMutator,
|
|
reportSampleMutator: reportSampleMutator,
|
|
stopped: make(chan struct{}),
|
|
l: l,
|
|
ctx: ctx,
|
|
}
|
|
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
|
|
|
|
return sl
|
|
}
|
|
|
|
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|
select {
|
|
case <-time.After(sl.scraper.offset(interval)):
|
|
// Continue after a scraping offset.
|
|
case <-sl.scrapeCtx.Done():
|
|
close(sl.stopped)
|
|
return
|
|
}
|
|
|
|
var last time.Time
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
buf := bytes.NewBuffer(make([]byte, 0, 16000))
|
|
|
|
mainLoop:
|
|
for {
|
|
buf.Reset()
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
close(sl.stopped)
|
|
return
|
|
case <-sl.scrapeCtx.Done():
|
|
break mainLoop
|
|
default:
|
|
}
|
|
|
|
var (
|
|
start = time.Now()
|
|
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
|
|
)
|
|
|
|
// Only record after the first scrape.
|
|
if !last.IsZero() {
|
|
targetIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
time.Since(last).Seconds(),
|
|
)
|
|
}
|
|
|
|
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
|
|
buf := bytes.NewBuffer(b)
|
|
|
|
scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
|
|
cancel()
|
|
|
|
if scrapeErr == nil {
|
|
b = buf.Bytes()
|
|
// NOTE: There were issues with misbehaving clients in the past
|
|
// that occasionally returned empty results. We don't want those
|
|
// to falsely reset our buffer size.
|
|
if len(b) > 0 {
|
|
sl.lastScrapeSize = len(b)
|
|
}
|
|
} else {
|
|
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
|
|
if errc != nil {
|
|
errc <- scrapeErr
|
|
}
|
|
}
|
|
|
|
// A failed scrape is the same as an empty scrape,
|
|
// we still call sl.append to trigger stale markers.
|
|
total, added, appErr := sl.append(b, start)
|
|
if appErr != nil {
|
|
level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
|
|
// The append failed, probably due to a parse error or sample limit.
|
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
|
if _, _, err := sl.append([]byte{}, start); err != nil {
|
|
level.Warn(sl.l).Log("msg", "append failed", "err", err)
|
|
}
|
|
}
|
|
|
|
sl.buffers.Put(b)
|
|
|
|
if scrapeErr == nil {
|
|
scrapeErr = appErr
|
|
}
|
|
|
|
if err := sl.report(start, time.Since(start), total, added, scrapeErr); err != nil {
|
|
level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err)
|
|
}
|
|
last = start
|
|
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
close(sl.stopped)
|
|
return
|
|
case <-sl.scrapeCtx.Done():
|
|
break mainLoop
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
|
|
close(sl.stopped)
|
|
|
|
sl.endOfRunStaleness(last, ticker, interval)
|
|
}
|
|
|
|
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
|
|
// Scraping has stopped. We want to write stale markers but
|
|
// the target may be recreated, so we wait just over 2 scrape intervals
|
|
// before creating them.
|
|
// If the context is cancelled, we presume the server is shutting down
|
|
// and will restart where is was. We do not attempt to write stale markers
|
|
// in this case.
|
|
|
|
if last.IsZero() {
|
|
// There never was a scrape, so there will be no stale markers.
|
|
return
|
|
}
|
|
|
|
// Wait for when the next scrape would have been, record its timestamp.
|
|
var staleTime time.Time
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
staleTime = time.Now()
|
|
}
|
|
|
|
// Wait for when the next scrape would have been, if the target was recreated
|
|
// samples should have been ingested by now.
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
|
|
// Wait for an extra 10% of the interval, just to be safe.
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
case <-time.After(interval / 10):
|
|
}
|
|
|
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
|
// If the target has since been recreated and scraped, the
|
|
// stale markers will be out of order and ignored.
|
|
if _, _, err := sl.append([]byte{}, staleTime); err != nil {
|
|
level.Error(sl.l).Log("msg", "stale append failed", "err", err)
|
|
}
|
|
if err := sl.reportStale(staleTime); err != nil {
|
|
level.Error(sl.l).Log("msg", "stale report failed", "err", err)
|
|
}
|
|
}
|
|
|
|
// Stop the scraping. May still write data and stale markers after it has
|
|
// returned. Cancel the context to stop all writes.
|
|
func (sl *scrapeLoop) stop() {
|
|
sl.cancel()
|
|
<-sl.stopped
|
|
}
|
|
|
|
type sample struct {
|
|
metric labels.Labels
|
|
t int64
|
|
v float64
|
|
}
|
|
|
|
type samples []sample
|
|
|
|
func (s samples) Len() int { return len(s) }
|
|
func (s samples) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
|
func (s samples) Less(i, j int) bool {
|
|
d := labels.Compare(s[i].metric, s[j].metric)
|
|
if d < 0 {
|
|
return true
|
|
} else if d > 0 {
|
|
return false
|
|
}
|
|
return s[i].t < s[j].t
|
|
}
|
|
|
|
func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err error) {
|
|
var (
|
|
app = sl.appender()
|
|
p = textparse.New(b)
|
|
defTime = timestamp.FromTime(ts)
|
|
numOutOfOrder = 0
|
|
numDuplicates = 0
|
|
numOutOfBounds = 0
|
|
)
|
|
var sampleLimitErr error
|
|
|
|
loop:
|
|
for {
|
|
var et textparse.Entry
|
|
if et, err = p.Next(); err != nil {
|
|
if err == io.EOF {
|
|
err = nil
|
|
}
|
|
break
|
|
}
|
|
switch et {
|
|
case textparse.EntryType:
|
|
sl.cache.setType(p.Type())
|
|
continue
|
|
case textparse.EntryHelp:
|
|
sl.cache.setHelp(p.Help())
|
|
continue
|
|
case textparse.EntryComment:
|
|
continue
|
|
default:
|
|
}
|
|
total++
|
|
|
|
t := defTime
|
|
met, tp, v := p.Series()
|
|
if tp != nil {
|
|
t = *tp
|
|
}
|
|
|
|
if sl.cache.getDropped(yoloString(met)) {
|
|
continue
|
|
}
|
|
ce, ok := sl.cache.get(yoloString(met))
|
|
if ok {
|
|
switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
|
|
case nil:
|
|
if tp == nil {
|
|
sl.cache.trackStaleness(ce.hash, ce.lset)
|
|
}
|
|
case storage.ErrNotFound:
|
|
ok = false
|
|
case storage.ErrOutOfOrderSample:
|
|
numOutOfOrder++
|
|
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
|
|
targetScrapeSampleOutOfOrder.Inc()
|
|
continue
|
|
case storage.ErrDuplicateSampleForTimestamp:
|
|
numDuplicates++
|
|
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
|
|
targetScrapeSampleDuplicate.Inc()
|
|
continue
|
|
case storage.ErrOutOfBounds:
|
|
numOutOfBounds++
|
|
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
|
|
targetScrapeSampleOutOfBounds.Inc()
|
|
continue
|
|
case errSampleLimit:
|
|
// Keep on parsing output if we hit the limit, so we report the correct
|
|
// total number of samples scraped.
|
|
sampleLimitErr = err
|
|
added++
|
|
continue
|
|
default:
|
|
break loop
|
|
}
|
|
}
|
|
if !ok {
|
|
var lset labels.Labels
|
|
|
|
mets := p.Metric(&lset)
|
|
hash := lset.Hash()
|
|
|
|
// Hash label set as it is seen local to the target. Then add target labels
|
|
// and relabeling and store the final label set.
|
|
lset = sl.sampleMutator(lset)
|
|
|
|
// The label set may be set to nil to indicate dropping.
|
|
if lset == nil {
|
|
sl.cache.addDropped(mets)
|
|
continue
|
|
}
|
|
|
|
var ref uint64
|
|
ref, err = app.Add(lset, t, v)
|
|
// TODO(fabxc): also add a dropped-cache?
|
|
switch err {
|
|
case nil:
|
|
case storage.ErrOutOfOrderSample:
|
|
err = nil
|
|
numOutOfOrder++
|
|
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
|
|
targetScrapeSampleOutOfOrder.Inc()
|
|
continue
|
|
case storage.ErrDuplicateSampleForTimestamp:
|
|
err = nil
|
|
numDuplicates++
|
|
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
|
|
targetScrapeSampleDuplicate.Inc()
|
|
continue
|
|
case storage.ErrOutOfBounds:
|
|
err = nil
|
|
numOutOfBounds++
|
|
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
|
|
targetScrapeSampleOutOfBounds.Inc()
|
|
continue
|
|
case errSampleLimit:
|
|
sampleLimitErr = err
|
|
added++
|
|
continue
|
|
default:
|
|
level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
|
|
break loop
|
|
}
|
|
if tp == nil {
|
|
// Bypass staleness logic if there is an explicit timestamp.
|
|
sl.cache.trackStaleness(hash, lset)
|
|
}
|
|
sl.cache.addRef(mets, ref, lset, hash)
|
|
}
|
|
added++
|
|
}
|
|
if sampleLimitErr != nil {
|
|
if err == nil {
|
|
err = sampleLimitErr
|
|
}
|
|
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
|
|
targetScrapeSampleLimit.Inc()
|
|
}
|
|
if numOutOfOrder > 0 {
|
|
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)
|
|
}
|
|
if numDuplicates > 0 {
|
|
level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)
|
|
}
|
|
if numOutOfBounds > 0 {
|
|
level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)
|
|
}
|
|
if err == nil {
|
|
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
|
// Series no longer exposed, mark it stale.
|
|
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
|
|
switch err {
|
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
|
// Do not count these in logging, as this is expected if a target
|
|
// goes away and comes back again with a new scrape loop.
|
|
err = nil
|
|
}
|
|
return err == nil
|
|
})
|
|
}
|
|
if err != nil {
|
|
app.Rollback()
|
|
return total, added, err
|
|
}
|
|
if err := app.Commit(); err != nil {
|
|
return total, added, err
|
|
}
|
|
|
|
sl.cache.iterDone()
|
|
|
|
return total, added, nil
|
|
}
|
|
|
|
func yoloString(b []byte) string {
|
|
return *((*string)(unsafe.Pointer(&b)))
|
|
}
|
|
|
|
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
|
// with scraped metrics in the cache.
|
|
const (
|
|
scrapeHealthMetricName = "up" + "\xff"
|
|
scrapeDurationMetricName = "scrape_duration_seconds" + "\xff"
|
|
scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff"
|
|
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
|
|
)
|
|
|
|
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
|
|
sl.scraper.report(start, duration, err)
|
|
|
|
ts := timestamp.FromTime(start)
|
|
|
|
var health float64
|
|
if err == nil {
|
|
health = 1
|
|
}
|
|
app := sl.appender()
|
|
|
|
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
return app.Commit()
|
|
}
|
|
|
|
func (sl *scrapeLoop) reportStale(start time.Time) error {
|
|
ts := timestamp.FromTime(start)
|
|
app := sl.appender()
|
|
|
|
stale := math.Float64frombits(value.StaleNaN)
|
|
|
|
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
return app.Commit()
|
|
}
|
|
|
|
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
|
|
ce, ok := sl.cache.get(s)
|
|
if ok {
|
|
err := app.AddFast(ce.lset, ce.ref, t, v)
|
|
switch err {
|
|
case nil:
|
|
return nil
|
|
case storage.ErrNotFound:
|
|
// Try an Add.
|
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
|
// Do not log here, as this is expected if a target goes away and comes back
|
|
// again with a new scrape loop.
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
lset := labels.Labels{
|
|
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
|
// with scraped metrics in the cache.
|
|
// We have to drop it when building the actual metric.
|
|
labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
|
|
}
|
|
|
|
hash := lset.Hash()
|
|
lset = sl.reportSampleMutator(lset)
|
|
|
|
ref, err := app.Add(lset, t, v)
|
|
switch err {
|
|
case nil:
|
|
sl.cache.addRef(s, ref, lset, hash)
|
|
return nil
|
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|