scrape: Added option to scrape without initial jitter and on shutdown.

To experiment with Prometheus pull methond on semi-short lived serverless environments
we (Google Cloud) want to attempt init and shutdown scrapes to have some basic data from
short lived workloads. This has little sense for Prometheus scrape manager,
but it might be useful for external importers like OpenTelemetry and distributions,
which can be deployed in various configurations (e.g. sidecar with just single target).

It's up to us to merge it or close. I would be fine maintaining on upstream, but we
might as well keep it in our fork to experiment on this--and perhaps merge once official
contrib Otel decides to use those options.

NOTE: Also added high level scrape manager test. I think it was kind of bad we never had test integrating
all scrape manager pieces. Can add that in separate PR as well.

Alternatives attempted:

* Manager Option for scrape on shutdown. This was not trivial due to 3 different context we pass. We would need to disconnect them from parent context (sometimes) for scrapeAndReport. Intrusive and prone to mistaken cancelations.
* ForceScrape method. This is not trivia as the scrape would need to be additionally locked. Plus semantics on what to do after (continue interval?) is not clear. We can scope this problem down to stopAndScrape semantics.

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2023-08-11 12:02:41 +01:00
parent 4a56a6bf59
commit 8600c1410a
7 changed files with 390 additions and 172 deletions

View file

@ -927,9 +927,9 @@ func main() {
// we wait until the config is fully loaded.
<-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
return nil
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB

View file

@ -18,6 +18,7 @@ import (
"fmt"
"math/rand"
"strings"
"sync"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -65,9 +66,20 @@ type histogramSample struct {
fh *histogram.FloatHistogram
}
type collectResultAppendable struct {
*collectResultAppender
}
func (a *collectResultAppendable) Appender(_ context.Context) storage.Appender {
return a
}
// collectResultAppender records all samples that were added through the appender.
// It can be used as its zero value or be backed by another appender it writes samples through.
// Go-routine safe.
type collectResultAppender struct {
mtx sync.Mutex
next storage.Appender
resultFloats []floatSample
pendingFloats []floatSample
@ -82,6 +94,9 @@ type collectResultAppender struct {
}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingFloats = append(a.pendingFloats, floatSample{
metric: lset,
t: t,
@ -103,6 +118,9 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels
}
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingExemplars = append(a.pendingExemplars, e)
if a.next == nil {
return 0, nil
@ -112,6 +130,9 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
}
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t})
if a.next == nil {
return 0, nil
@ -121,6 +142,9 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
}
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingMetadata = append(a.pendingMetadata, m)
if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
@ -133,6 +157,9 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L
}
func (a *collectResultAppender) Commit() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.resultFloats = append(a.resultFloats, a.pendingFloats...)
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
@ -148,6 +175,9 @@ func (a *collectResultAppender) Commit() error {
}
func (a *collectResultAppender) Rollback() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.rolledbackFloats = a.pendingFloats
a.rolledbackHistograms = a.pendingHistograms
a.pendingFloats = nil
@ -159,6 +189,9 @@ func (a *collectResultAppender) Rollback() error {
}
func (a *collectResultAppender) String() string {
a.mtx.Lock()
defer a.mtx.Unlock()
var sb strings.Builder
for _, s := range a.resultFloats {
sb.WriteString(fmt.Sprintf("committed: %s %f %d\n", s.metric, s.f, s.t))

View file

@ -140,6 +140,13 @@ type Options struct {
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption
// IgnoreJitter causes all targets managed by this manager to be scraped
// as soon as they are discovered. By default, all targets have offset,
// so we spread the scraping load evenly within Prometheus server.
// NOTE(bwplotka): This option is experimental and not used by Prometheus.
// It was created for serverless flavors of OpenTelemetry contrib's prometheusreceiver.
IgnoreJitter bool
}
// Manager maintains a set of scrape pools and manages start/stop cycles
@ -161,7 +168,7 @@ type Manager struct {
// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
go m.reloader()
for {
select {
@ -174,7 +181,7 @@ func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
}
case <-m.graceShut:
return nil
return
}
}
}
@ -259,6 +266,40 @@ func (m *Manager) Stop() {
close(m.graceShut)
}
// StopAfterScrapeAttempt stops manager after ensuring all targets' last scrape
// attempt happened after minScrapeTime. It cancels all running scrape pools and
// blocks until all have exited.
//
// It is likely that such shutdown scrape will cause irregular scrape interval and
// sudden efficiency (memory, CPU) spikes. However, it can be a fair tradeoff for
// short-lived and small number of targets, where at least one or two samples could be
// preserved (and ideally aggregated further by separate processes).
//
// NOTE(bwplotka): This method is experimental and not used by Prometheus.
// It was created for serverless flavors of OpenTelemetry contrib's prometheusreceiver.
func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
var wg sync.WaitGroup
for _, p := range m.scrapePools {
for _, l := range p.loops {
l := l
wg.Add(1)
go func() {
l.stopAfterScrapeAttempt(minScrapeTime)
wg.Done()
}()
}
}
wg.Wait()
for _, sp := range m.scrapePools {
sp.stop()
}
close(m.graceShut)
}
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
m.targetSets = tsets

View file

@ -15,11 +15,16 @@ package scrape
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
@ -29,6 +34,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/util/runutil"
)
func TestPopulateLabels(t *testing.T) {
@ -512,6 +518,7 @@ scrape_configs:
logger: nil,
config: cfg1.ScrapeConfigs[0],
client: http.DefaultClient,
opts: &opts,
}
scrapeManager.scrapePools = map[string]*scrapePool{
"job1": sp,
@ -694,8 +701,7 @@ scrape_configs:
}
}
opts := Options{}
scrapeManager := NewManager(&opts, nil, nil)
scrapeManager := NewManager(&Options{}, nil, nil)
reload(scrapeManager, cfg1)
require.ElementsMatch(t, []string{"job1", "job2"}, scrapeManager.ScrapePools())
@ -703,3 +709,127 @@ scrape_configs:
reload(scrapeManager, cfg2)
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
}
func TestManagerStopAfterScrapeAttempt(t *testing.T) {
for _, tcase := range []struct {
name string
noJitter bool
stop func(m *Manager)
expectedSamples int
}{
{
name: "no scrape stop, no jitter",
noJitter: true,
stop: func(m *Manager) { m.Stop() },
expectedSamples: 1,
},
{
name: "no scrape on stop, with jitter",
stop: func(m *Manager) { m.Stop() },
expectedSamples: 0,
},
{
name: "scrape on stop, no jitter",
noJitter: true,
stop: func(m *Manager) { m.StopAfterScrapeAttempt(time.Now()) },
expectedSamples: 2,
},
{
name: "scrape on stop, but initial sample is fresh enough, no jitter",
noJitter: true,
stop: func(m *Manager) { m.StopAfterScrapeAttempt(time.Now().Add(-1 * time.Hour)) },
expectedSamples: 1,
},
{
name: "scrape on stop, with jitter",
stop: func(m *Manager) { m.StopAfterScrapeAttempt(time.Now()) },
expectedSamples: 1,
},
} {
t.Run(tcase.name, func(t *testing.T) {
app := &collectResultAppender{}
// Setup scrape manager.
scrapeManager := NewManager(&Options{
IgnoreJitter: tcase.noJitter,
// Extremely high value to turn it off. We don't want to wait minimum 5s, so
// we reload manually.
// TODO(bwplotka): Make scrape manager more testable.
DiscoveryReloadInterval: model.Duration(99 * time.Hour),
}, log.NewLogfmtLogger(os.Stderr), &collectResultAppendable{app})
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Extremely high scrape interval, to ensure the only chance to see the
// sample is on start and stopAfterScrapeAttempt.
ScrapeInterval: model.Duration(99 * time.Hour),
ScrapeTimeout: model.Duration(10 * time.Second),
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
// Start fake HTTP target to scrape returning a single metric.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `text/plain; version=0.0.4`)
w.Write([]byte("expected_metric 1\n"))
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {
{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
},
},
})
scrapeManager.reload()
// At this point the first sample is scheduled to be scraped after the initial
// jitter in the background scrape loop go-routine
//
// With jitter the first sample will appear after long time,
// given the extremely long scrape interval configured. We stop right
// away and expect only the last sample due to stop.
//
// With no jitter setting, we expect the first to be added straight away--wait
// for it, before stopping.
if tcase.noJitter {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
if countFloatSamples(app, "expected_metric") < 1 {
return errors.New("expected more then one expected_metric sample")
}
return nil
}), "after 5 seconds")
}
tcase.stop(scrapeManager)
require.Equal(t, tcase.expectedSamples, countFloatSamples(app, "expected_metric"))
})
}
}
func countFloatSamples(a *collectResultAppender, expectedMetricName string) (count int) {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, f := range a.resultFloats {
if f.metric.Get(model.MetricNameLabel) == expectedMetricName {
count++
}
}
return count
}

View file

@ -231,7 +231,6 @@ type scrapePool struct {
appendable storage.Appendable
logger log.Logger
cancel context.CancelFunc
httpOpts []config_util.HTTPClientOption
// mtx must not be taken after targetMtx.
mtx sync.Mutex
@ -248,9 +247,7 @@ type scrapePool struct {
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop
noDefaultPort bool
enableProtobufNegotiation bool
opts *Options
}
type labelLimits struct {
@ -295,16 +292,14 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
cancel: cancel,
appendable: app,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
httpOpts: options.HTTPClientOptions,
noDefaultPort: options.NoDefaultPort,
enableProtobufNegotiation: options.EnableProtobufNegotiation,
cancel: cancel,
appendable: app,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
opts: options,
}
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
@ -333,10 +328,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.interval,
opts.timeout,
opts.scrapeClassicHistograms,
options.ExtraMetrics,
options.EnableMetadataStorage,
opts.target,
options.PassMetadataInContext,
options,
)
}
targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
@ -404,7 +397,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolReloads.Inc()
start := time.Now()
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, sp.httpOpts...)
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, sp.opts.HTTPClientOptions...)
if err != nil {
targetScrapePoolReloadsFailed.Inc()
return errors.Wrap(err, "error creating HTTP client")
@ -449,7 +442,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
t := sp.activeTargets[fp]
interval, timeout, err := t.intervalAndTimeout(interval, timeout)
acceptHeader := scrapeAcceptHeader
if sp.enableProtobufNegotiation {
if sp.opts.EnableProtobufNegotiation {
acceptHeader = scrapeAcceptHeaderWithProtobuf
}
var (
@ -507,7 +500,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
lb := labels.NewBuilder(labels.EmptyLabels())
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
targets, failures := TargetsFromGroup(tg, sp.config, sp.noDefaultPort, targets, lb)
targets, failures := TargetsFromGroup(tg, sp.config, sp.opts.NoDefaultPort, targets, lb)
for _, err := range failures {
level.Error(sp.logger).Log("msg", "Creating target failed", "err", err)
}
@ -566,7 +559,7 @@ func (sp *scrapePool) sync(targets []*Target) {
var err error
interval, timeout, err = t.intervalAndTimeout(interval, timeout)
acceptHeader := scrapeAcceptHeader
if sp.enableProtobufNegotiation {
if sp.opts.EnableProtobufNegotiation {
acceptHeader = scrapeAcceptHeaderWithProtobuf
}
s := &targetScraper{Target: t, client: sp.client, timeout: timeout, bodySizeLimit: bodySizeLimit, acceptHeader: acceptHeader}
@ -869,11 +862,15 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
return resp.Header.Get("Content-Type"), nil
}
// A loop can run and be stopped again. It must not be reused after it was stopped.
// A loop can run and be stopped. The same loop instance must not be reused
// after it was stopped.
// stop and stopAfterScrapeAttempt can be invoked multiple times, but after first
// stop, other invocations do nothing.
type loop interface {
run(errc chan<- error)
setForcedError(err error)
stop()
stopAfterScrapeAttempt(minScrapeTime time.Time)
getCache() *scrapeCache
disableEndOfRunStalenessMarkers()
}
@ -913,9 +910,9 @@ type scrapeLoop struct {
stopped chan struct{}
disabledEndOfRunStalenessMarkers bool
stopAfterScrapeAttemptCh chan time.Time
reportExtraMetrics bool
appendMetadataToWAL bool
opts *Options
}
// scrapeCache tracks mappings of exposed metric strings to label sets and
@ -1148,7 +1145,7 @@ func (c *scrapeCache) ListMetadata() []MetricMetadata {
return res
}
// MetadataSize returns the size of the metadata cache.
// SizeMetadata returns the size of the metadata cache.
func (c *scrapeCache) SizeMetadata() (s int) {
c.metaMtx.Lock()
defer c.metaMtx.Unlock()
@ -1159,7 +1156,7 @@ func (c *scrapeCache) SizeMetadata() (s int) {
return s
}
// MetadataLen returns the number of metadata entries in the cache.
// LengthMetadata returns the number of metadata entries in the cache.
func (c *scrapeCache) LengthMetadata() int {
c.metaMtx.Lock()
defer c.metaMtx.Unlock()
@ -1167,7 +1164,8 @@ func (c *scrapeCache) LengthMetadata() int {
return len(c.metadata)
}
func newScrapeLoop(ctx context.Context,
func newScrapeLoop(
ctx context.Context,
sc scraper,
l log.Logger,
buffers *pool.Pool,
@ -1183,10 +1181,8 @@ func newScrapeLoop(ctx context.Context,
interval time.Duration,
timeout time.Duration,
scrapeClassicHistograms bool,
reportExtraMetrics bool,
appendMetadataToWAL bool,
target *Target,
passMetadataInContext bool,
opts *Options,
) *scrapeLoop {
if l == nil {
l = log.NewNopLogger()
@ -1197,10 +1193,13 @@ func newScrapeLoop(ctx context.Context,
if cache == nil {
cache = newScrapeCache()
}
if opts == nil {
opts = &Options{}
}
appenderCtx := ctx
if passMetadataInContext {
if opts.PassMetadataInContext {
// Store the cache and target in the context. This is then used by downstream OTel Collector
// to lookup the metadata required to process the samples. Not used by Prometheus itself.
// TODO(gouthamve) We're using a dedicated context because using the parentCtx caused a memory
@ -1210,26 +1209,26 @@ func newScrapeLoop(ctx context.Context,
}
sl := &scrapeLoop{
scraper: sc,
buffers: buffers,
cache: cache,
appender: appender,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
offsetSeed: offsetSeed,
l: l,
parentCtx: ctx,
appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
scraper: sc,
buffers: buffers,
cache: cache,
appender: appender,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
offsetSeed: offsetSeed,
l: l,
parentCtx: ctx,
appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms,
opts: opts,
stopAfterScrapeAttemptCh: make(chan time.Time, 1),
}
sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -1237,31 +1236,32 @@ func newScrapeLoop(ctx context.Context,
}
func (sl *scrapeLoop) run(errc chan<- error) {
defer close(sl.stopAfterScrapeAttemptCh)
jitterDelayTime := sl.scraper.offset(sl.interval, sl.offsetSeed)
if sl.opts.IgnoreJitter {
jitterDelayTime = 0 * time.Second
}
select {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
// Continue after a scraping offset.
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.stopAfterScrapeAttemptCh:
sl.cancel()
case <-time.After(jitterDelayTime):
}
var last time.Time
alignedScrapeTime := time.Now().Round(0)
ticker := time.NewTicker(sl.interval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
default:
}
// Temporary workaround for a jitter in go timers that causes disk space
// increase in TSDB.
// See https://github.com/prometheus/prometheus/issues/7846
@ -1288,6 +1288,11 @@ mainLoop:
return
case <-sl.ctx.Done():
break mainLoop
case minScrapeTime := <-sl.stopAfterScrapeAttemptCh:
sl.cancel()
if minScrapeTime.Before(last) {
break mainLoop
}
case <-ticker.C:
}
}
@ -1476,11 +1481,39 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
}
}
// Stop the scraping. May still write data and stale markers after it has
// returned. Cancel the context to stop all writes.
// stop stops the scraping. The loop may still write data and stale markers after
// stop has returned. Cancel the context (e.g. via Manager.Stop()) to stop all
// writes or if stop takes too much time.
func (sl *scrapeLoop) stop() {
if sl.cancel == nil {
return
}
sl.cancel()
<-sl.stopped
sl.cancel = nil
}
// stopAfterScrapeAttempt stops scraping after ensuring the last scrape attempt
// happened after minScrapeTime. minScrapeTime can't be larger than time.Now.
//
// Similar to stop, the loop may still write data and stale markers after
// stopAfterScrapeAttempt has returned. Cancel the context (e.g. via
// Manager.Stop()) to stop all writes or if stopAfterScrapeAttempt takes too much
// time.
func (sl *scrapeLoop) stopAfterScrapeAttempt(minScrapeTime time.Time) {
if sl.cancel == nil {
return
}
now := time.Now()
if minScrapeTime.After(now) {
minScrapeTime = now
}
sl.stopAfterScrapeAttemptCh <- minScrapeTime
<-sl.stopped
sl.cancel = nil
}
func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() {
@ -1523,7 +1556,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
// labelset is for a new series or the metadata for this series has just
// changed. It returns a boolean based on whether the metadata was updated.
updateMetadata := func(lset labels.Labels, isNewSeries bool) bool {
if !sl.appendMetadataToWAL {
if !sl.opts.EnableMetadataStorage {
return false
}
@ -1698,7 +1731,7 @@ loop:
e = exemplar.Exemplar{} // reset for next time round loop
}
if sl.appendMetadataToWAL && metadataChanged {
if sl.opts.EnableMetadataStorage && metadataChanged {
if _, merr := app.UpdateMetadata(ref, lset, meta); merr != nil {
// No need to fail the scrape on errors appending metadata.
level.Debug(sl.l).Log("msg", "Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", meta), "err", merr)
@ -1840,7 +1873,7 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil {
return
}
if sl.reportExtraMetrics {
if sl.opts.ExtraMetrics {
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, sl.timeout.Seconds()); err != nil {
return
}
@ -1874,7 +1907,7 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil {
return
}
if sl.reportExtraMetrics {
if sl.opts.ExtraMetrics {
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, stale); err != nil {
return
}

View file

@ -170,6 +170,10 @@ func (l *testLoop) stop() {
l.stopFunc()
}
func (l *testLoop) stopAfterScrapeAttempt(_ time.Time) {
l.stopFunc()
}
func (l *testLoop) getCache() *scrapeCache {
return nil
}
@ -265,6 +269,7 @@ func TestScrapePoolReload(t *testing.T) {
newLoop: newLoop,
logger: nil,
client: http.DefaultClient,
opts: &Options{},
}
// Reloading a scrape pool with a new scrape configuration must stop all scrape
@ -348,6 +353,7 @@ func TestScrapePoolReloadPreserveRelabeledIntervalTimeout(t *testing.T) {
newLoop: newLoop,
logger: nil,
client: http.DefaultClient,
opts: &Options{},
}
err := sp.reload(reloadCfg)
@ -377,6 +383,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
newLoop: newLoop,
logger: log.NewNopLogger(),
client: http.DefaultClient,
opts: &Options{},
}
tgs := []*targetgroup.Group{}
@ -591,6 +598,7 @@ func TestScrapePoolScrapeLoopsStarted(t *testing.T) {
newLoop: newLoop,
logger: nil,
client: http.DefaultClient,
opts: &Options{},
}
tgs := []*targetgroup.Group{
@ -635,10 +643,8 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
1,
0,
false,
false,
false,
nil,
false,
nil,
)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
@ -708,10 +714,8 @@ func TestScrapeLoopStop(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
// Terminate loop after 2 scrapes.
@ -785,10 +789,8 @@ func TestScrapeLoopRun(t *testing.T) {
time.Second,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
// The loop must terminate during the initial offset if the context
@ -841,10 +843,8 @@ func TestScrapeLoopRun(t *testing.T) {
time.Second,
100*time.Millisecond,
false,
false,
false,
nil,
false,
nil,
)
go func() {
@ -901,10 +901,8 @@ func TestScrapeLoopForcedErr(t *testing.T) {
time.Second,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
forcedErr := fmt.Errorf("forced err")
@ -960,10 +958,8 @@ func TestScrapeLoopMetadata(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
defer cancel()
@ -1018,10 +1014,8 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
t.Cleanup(func() { cancel() })
@ -1079,10 +1073,8 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
slApp := sl.appender(ctx)
@ -1158,10 +1150,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
// Succeed once, several failures, then stop.
numScrapes := 0
@ -1222,10 +1212,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
// Succeed once, several failures, then stop.
@ -1289,10 +1277,8 @@ func TestScrapeLoopCache(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
numScrapes := 0
@ -1373,10 +1359,8 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
numScrapes := 0
@ -1488,10 +1472,8 @@ func TestScrapeLoopAppend(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -1579,7 +1561,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
},
nil,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, 0, nil, 0, 0, false, false, false, nil, false,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, 0, nil, 0, 0, false, nil, nil,
)
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
@ -1615,10 +1597,8 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
fakeRef := storage.SeriesRef(1)
@ -1674,10 +1654,8 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
// Get the value of the Counter before performing the append.
@ -1752,10 +1730,8 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
metric := dto.Metric{}
@ -1851,10 +1827,8 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -1900,10 +1874,8 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -1952,10 +1924,8 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -2168,10 +2138,8 @@ metric: <
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -2253,10 +2221,8 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -2306,10 +2272,8 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -2343,10 +2307,8 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -2393,10 +2355,8 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Unix(1, 0)
@ -2439,10 +2399,8 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now().Add(20 * time.Minute)
@ -2712,10 +2670,8 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -2754,10 +2710,8 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
now := time.Now()
@ -2795,10 +2749,8 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
defer cancel()
@ -2854,10 +2806,8 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
defer cancel()
@ -3118,10 +3068,8 @@ func TestScrapeAddFast(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
defer cancel()
@ -3205,10 +3153,8 @@ func TestScrapeReportSingleAppender(t *testing.T) {
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
nil,
)
numScrapes := 0
@ -3408,10 +3354,8 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
0,
0,
false,
false,
false,
nil,
false,
nil,
)
slApp := sl.appender(context.Background())

37
util/runutil/runutil.go Normal file
View file

@ -0,0 +1,37 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copied from https://github.com/efficientgo/core/blob/a21078e2c723b69e05f95c65dbc5058712b4edd8/runutil/runutil.go#L39
// and adjusted.
package runutil
import "time"
// Retry executes f every interval seconds until timeout or no error is returned from f.
func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
tick := time.NewTicker(interval)
defer tick.Stop()
var err error
for {
if err = f(); err == nil {
return nil
}
select {
case <-stopc:
return err
case <-tick.C:
}
}
}