Retrieval: Do not buffer the samples if no sample limit configured

Also, simplify and streamline the code a bit.
This commit is contained in:
beorn7 2017-01-07 17:28:49 +01:00
parent 30448286c7
commit 3610331eeb
3 changed files with 197 additions and 184 deletions

View file

@ -110,7 +110,7 @@ type scrapePool struct {
loops map[uint64]loop loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience. // Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender, uint) loop newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
} }
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
@ -179,7 +179,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
var ( var (
t = sp.targets[fp] t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client} s = &targetScraper{Target: t, client: sp.client}
newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit) newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
) )
wg.Add(1) wg.Add(1)
@ -240,7 +240,7 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.targets[hash]; !ok { if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client} s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit) l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
sp.targets[hash] = t sp.targets[hash] = t
sp.loops[hash] = l sp.loops[hash] = l
@ -272,41 +272,6 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait() wg.Wait()
} }
// sampleMutator returns a function that'll take an appender and return an appender for mutated samples.
func (sp *scrapePool) sampleMutator(target *Target) func(storage.SampleAppender) storage.SampleAppender {
return func(app storage.SampleAppender) storage.SampleAppender {
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
SampleAppender: app,
relabelings: mrc,
}
}
if sp.config.HonorLabels {
app = honorLabelsAppender{
SampleAppender: app,
labels: target.Labels(),
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: target.Labels(),
}
}
return app
}
}
// reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender {
return ruleLabelsAppender{
SampleAppender: sp.appender,
labels: target.Labels(),
}
}
// A scraper retrieves samples and accepts a status report at the end. // A scraper retrieves samples and accepts a status report at the end.
type scraper interface { type scraper interface {
scrape(ctx context.Context, ts time.Time) (model.Samples, error) scrape(ctx context.Context, ts time.Time) (model.Samples, error)
@ -376,26 +341,32 @@ type scrapeLoop struct {
// Where samples are ultimately sent. // Where samples are ultimately sent.
appender storage.SampleAppender appender storage.SampleAppender
// Applies relabel rules and label handling.
mutator func(storage.SampleAppender) storage.SampleAppender targetLabels model.LabelSet
// For sending up and scrape_*. metricRelabelConfigs []*config.RelabelConfig
reportAppender storage.SampleAppender honorLabels bool
// Limit on number of samples that will be accepted. sampleLimit uint
sampleLimit uint
done chan struct{} done chan struct{}
ctx context.Context ctx context.Context
cancel func() cancel func()
} }
func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop { func newScrapeLoop(
ctx context.Context,
sc scraper,
appender storage.SampleAppender,
targetLabels model.LabelSet,
config *config.ScrapeConfig,
) loop {
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
appender: app, appender: appender,
mutator: mut, targetLabels: targetLabels,
reportAppender: reportApp, metricRelabelConfigs: config.MetricRelabelConfigs,
sampleLimit: sampleLimit, honorLabels: config.HonorLabels,
done: make(chan struct{}), sampleLimit: config.SampleLimit,
done: make(chan struct{}),
} }
sl.ctx, sl.cancel = context.WithCancel(ctx) sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -426,8 +397,9 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
if !sl.appender.NeedsThrottling() { if !sl.appender.NeedsThrottling() {
var ( var (
start = time.Now() start = time.Now()
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
numPostRelabelSamples = 0
) )
// Only record after the first scrape. // Only record after the first scrape.
@ -438,11 +410,13 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
} }
samples, err := sl.scraper.scrape(scrapeCtx, start) samples, err := sl.scraper.scrape(scrapeCtx, start)
err = sl.processScrapeResult(samples, err, start) if err == nil {
numPostRelabelSamples, err = sl.append(samples)
}
if err != nil && errc != nil { if err != nil && errc != nil {
errc <- err errc <- err
} }
sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)
last = start last = start
} else { } else {
targetSkippedScrapes.WithLabelValues(interval.String()).Inc() targetSkippedScrapes.WithLabelValues(interval.String()).Inc()
@ -461,36 +435,73 @@ func (sl *scrapeLoop) stop() {
<-sl.done <-sl.done
} }
func (sl *scrapeLoop) processScrapeResult(samples model.Samples, scrapeErr error, start time.Time) error { // wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend
// Collect samples post-relabelling and label handling in a buffer. // appender and an innermost countingAppender that counts the samples actually
buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} // appended in the end.
if scrapeErr == nil { func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) {
app := sl.mutator(buf) // Innermost appender is a countingAppender to count how many samples
for _, sample := range samples { // are left in the end.
app.Append(sample) countingAppender := &countingAppender{
} SampleAppender: app,
}
app = countingAppender
if sl.sampleLimit > 0 && uint(len(buf.buffer)) > sl.sampleLimit { // The relabelAppender has to be inside the label-modifying appenders so
scrapeErr = fmt.Errorf("%d samples exceeded limit of %d", len(buf.buffer), sl.sampleLimit) // the relabeling rules are applied to the correct label set.
targetScrapeSampleLimit.Inc() if len(sl.metricRelabelConfigs) > 0 {
} else { app = relabelAppender{
// Send samples to storage. SampleAppender: app,
sl.append(buf.buffer) relabelings: sl.metricRelabelConfigs,
} }
} }
sl.report(start, time.Since(start), len(samples), len(buf.buffer), scrapeErr) if sl.honorLabels {
return scrapeErr app = honorLabelsAppender{
SampleAppender: app,
labels: sl.targetLabels,
}
} else {
app = ruleLabelsAppender{
SampleAppender: app,
labels: sl.targetLabels,
}
}
return app, countingAppender
} }
func (sl *scrapeLoop) append(samples model.Samples) { func (sl *scrapeLoop) append(samples model.Samples) (int, error) {
var ( var (
numOutOfOrder = 0 numOutOfOrder = 0
numDuplicates = 0 numDuplicates = 0
app = sl.appender
countingApp *countingAppender
) )
if sl.sampleLimit > 0 {
// We need to check for the sample limit, so append everything
// to a wrapped bufferAppender first. Then point samples to the
// result.
bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
var wrappedBufApp storage.SampleAppender
wrappedBufApp, countingApp = sl.wrapAppender(bufApp)
for _, s := range samples {
// Ignore errors as bufferedAppender always succeds.
wrappedBufApp.Append(s)
}
samples = bufApp.buffer
if uint(countingApp.count) > sl.sampleLimit {
targetScrapeSampleLimit.Inc()
return countingApp.count, fmt.Errorf(
"%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit,
)
}
} else {
// No need to check for sample limit. Wrap sl.appender directly.
app, countingApp = sl.wrapAppender(sl.appender)
}
for _, s := range samples { for _, s := range samples {
if err := sl.appender.Append(s); err != nil { if err := app.Append(s); err != nil {
switch err { switch err {
case local.ErrOutOfOrderSample: case local.ErrOutOfOrderSample:
numOutOfOrder++ numOutOfOrder++
@ -509,6 +520,7 @@ func (sl *scrapeLoop) append(samples model.Samples) {
if numDuplicates > 0 { if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
} }
return countingApp.count, nil
} }
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) { func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) {
@ -550,16 +562,21 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam
Value: model.SampleValue(postRelabelSamples), Value: model.SampleValue(postRelabelSamples),
} }
if err := sl.reportAppender.Append(healthSample); err != nil { reportAppender := ruleLabelsAppender{
SampleAppender: sl.appender,
labels: sl.targetLabels,
}
if err := reportAppender.Append(healthSample); err != nil {
log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded") log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded")
} }
if err := sl.reportAppender.Append(durationSample); err != nil { if err := reportAppender.Append(durationSample); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded") log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded")
} }
if err := sl.reportAppender.Append(countSample); err != nil { if err := reportAppender.Append(countSample); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded") log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded")
} }
if err := sl.reportAppender.Append(postRelabelSample); err != nil { if err := reportAppender.Append(postRelabelSample); err != nil {
log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabelling sample discarded") log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded")
} }
} }

View file

@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) {
} }
// On starting to run, new loops created on reload check whether their preceding // On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped. // equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop { newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, tl model.LabelSet, cfg *config.ScrapeConfig) loop {
l := &testLoop{} l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second { if interval != 3*time.Second {
@ -222,44 +222,19 @@ func TestScrapePoolReload(t *testing.T) {
} }
} }
func TestScrapePoolReportAppender(t *testing.T) { func TestScrapeLoopWrapSampleAppender(t *testing.T) {
cfg := &config.ScrapeConfig{ cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{ MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {}, {
}, Action: config.RelabelDrop,
} SourceLabels: model.LabelNames{"__name__"},
target := newTestTarget("example.com:80", 10*time.Millisecond, nil) Regex: config.MustNewRegexp("does_not_match_.*"),
app := &nopAppender{} },
{
sp := newScrapePool(context.Background(), cfg, app) Action: config.RelabelDrop,
SourceLabels: model.LabelNames{"__name__"},
cfg.HonorLabels = false Regex: config.MustNewRegexp("does_not_match_either_*"),
wrapped := sp.reportAppender(target) },
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if rl.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
}
cfg.HonorLabels = true
wrapped = sp.reportAppender(target)
hl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if hl.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
}
}
func TestScrapePoolSampleAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
}, },
} }
@ -269,7 +244,20 @@ func TestScrapePoolSampleAppender(t *testing.T) {
sp := newScrapePool(context.Background(), cfg, app) sp := newScrapePool(context.Background(), cfg, app)
cfg.HonorLabels = false cfg.HonorLabels = false
wrapped := sp.sampleMutator(target)(app)
sl := sp.newLoop(
sp.ctx,
&targetScraper{Target: target, client: sp.client},
sp.appender,
target.Labels(),
sp.config,
).(*scrapeLoop)
wrapped, counting := sl.wrapAppender(sl.appender)
wrapped.Append(&model.Sample{})
if counting.count != 1 {
t.Errorf("Expected count of 1, got %d", counting.count)
}
rl, ok := wrapped.(ruleLabelsAppender) rl, ok := wrapped.(ruleLabelsAppender)
if !ok { if !ok {
@ -279,12 +267,28 @@ func TestScrapePoolSampleAppender(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
} }
if re.SampleAppender != app { co, ok := re.SampleAppender.(*countingAppender)
t.Fatalf("Expected base appender but got %T", re.SampleAppender) if !ok {
t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender)
}
if co.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", co.SampleAppender)
} }
cfg.HonorLabels = true cfg.HonorLabels = true
wrapped = sp.sampleMutator(target)(app) sl = sp.newLoop(
sp.ctx,
&targetScraper{Target: target, client: sp.client},
sp.appender,
target.Labels(),
sp.config,
).(*scrapeLoop)
wrapped, counting = sl.wrapAppender(sl.appender)
wrapped.Append(&model.Sample{})
if counting.count != 1 {
t.Errorf("Expected count of 1, got %d", counting.count)
}
hl, ok := wrapped.(honorLabelsAppender) hl, ok := wrapped.(honorLabelsAppender)
if !ok { if !ok {
@ -294,8 +298,12 @@ func TestScrapePoolSampleAppender(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
} }
if re.SampleAppender != app { co, ok = re.SampleAppender.(*countingAppender)
t.Fatalf("Expected base appender but got %T", re.SampleAppender) if !ok {
t.Fatalf("Expected *countingAppender but got %T", re.SampleAppender)
}
if co.SampleAppender != app {
t.Fatalf("Expected base appender but got %T", co.SampleAppender)
} }
} }
@ -310,15 +318,14 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
} }
testCases := []struct { testCases := []struct {
scrapedSamples model.Samples scrapedSamples model.Samples
scrapeError error scrapeConfig *config.ScrapeConfig
scrapeConfig config.ScrapeConfig expectedReportedSamples model.Samples
expectedReportedSamples model.Samples expectedPostRelabelSamplesCount int
expectedIngestedSamplesCount int
}{ }{
{ { // 0
scrapedSamples: readSamples, scrapedSamples: readSamples,
scrapeError: nil, scrapeConfig: &config.ScrapeConfig{},
expectedReportedSamples: model.Samples{ expectedReportedSamples: model.Samples{
{ {
Metric: model.Metric{"__name__": "up"}, Metric: model.Metric{"__name__": "up"},
@ -326,6 +333,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
}, },
{ {
Metric: model.Metric{"__name__": "scrape_duration_seconds"}, Metric: model.Metric{"__name__": "scrape_duration_seconds"},
Value: 42,
}, },
{ {
Metric: model.Metric{"__name__": "scrape_samples_scraped"}, Metric: model.Metric{"__name__": "scrape_samples_scraped"},
@ -336,12 +344,11 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
Value: 2, Value: 2,
}, },
}, },
expectedIngestedSamplesCount: 2, expectedPostRelabelSamplesCount: 2,
}, },
{ { // 1
scrapedSamples: readSamples, scrapedSamples: readSamples,
scrapeError: nil, scrapeConfig: &config.ScrapeConfig{
scrapeConfig: config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{ MetricRelabelConfigs: []*config.RelabelConfig{
{ {
Action: config.RelabelDrop, Action: config.RelabelDrop,
@ -357,6 +364,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
}, },
{ {
Metric: model.Metric{"__name__": "scrape_duration_seconds"}, Metric: model.Metric{"__name__": "scrape_duration_seconds"},
Value: 42,
}, },
{ {
Metric: model.Metric{"__name__": "scrape_samples_scraped"}, Metric: model.Metric{"__name__": "scrape_samples_scraped"},
@ -367,12 +375,11 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
Value: 1, Value: 1,
}, },
}, },
expectedIngestedSamplesCount: 1, expectedPostRelabelSamplesCount: 1,
}, },
{ { // 2
scrapedSamples: readSamples, scrapedSamples: readSamples,
scrapeError: nil, scrapeConfig: &config.ScrapeConfig{
scrapeConfig: config.ScrapeConfig{
SampleLimit: 1, SampleLimit: 1,
MetricRelabelConfigs: []*config.RelabelConfig{ MetricRelabelConfigs: []*config.RelabelConfig{
{ {
@ -389,6 +396,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
}, },
{ {
Metric: model.Metric{"__name__": "scrape_duration_seconds"}, Metric: model.Metric{"__name__": "scrape_duration_seconds"},
Value: 42,
}, },
{ {
Metric: model.Metric{"__name__": "scrape_samples_scraped"}, Metric: model.Metric{"__name__": "scrape_samples_scraped"},
@ -399,12 +407,11 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
Value: 1, Value: 1,
}, },
}, },
expectedIngestedSamplesCount: 1, expectedPostRelabelSamplesCount: 1,
}, },
{ { // 3
scrapedSamples: readSamples, scrapedSamples: readSamples,
scrapeError: nil, scrapeConfig: &config.ScrapeConfig{
scrapeConfig: config.ScrapeConfig{
SampleLimit: 1, SampleLimit: 1,
}, },
expectedReportedSamples: model.Samples{ expectedReportedSamples: model.Samples{
@ -414,6 +421,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
}, },
{ {
Metric: model.Metric{"__name__": "scrape_duration_seconds"}, Metric: model.Metric{"__name__": "scrape_duration_seconds"},
Value: 42,
}, },
{ {
Metric: model.Metric{"__name__": "scrape_samples_scraped"}, Metric: model.Metric{"__name__": "scrape_samples_scraped"},
@ -424,53 +432,31 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
Value: 2, Value: 2,
}, },
}, },
expectedIngestedSamplesCount: 0, expectedPostRelabelSamplesCount: 2,
},
{
scrapedSamples: model.Samples{},
scrapeError: fmt.Errorf("error"),
expectedReportedSamples: model.Samples{
{
Metric: model.Metric{"__name__": "up"},
Value: 0,
},
{
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
},
{
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
Value: 0,
},
{
Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
Value: 0,
},
},
expectedIngestedSamplesCount: 0,
}, },
} }
for i, test := range testCases { for i, test := range testCases {
ingestedSamples := &bufferAppender{buffer: model.Samples{}} ingestedSamples := &bufferAppender{buffer: model.Samples{}}
reportedSamples := &bufferAppender{buffer: model.Samples{}}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil) target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
sp := newScrapePool(context.Background(), &test.scrapeConfig, ingestedSamples)
scraper := &testScraper{} scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples, test.scrapeConfig.SampleLimit).(*scrapeLoop) sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, target.Labels(), test.scrapeConfig).(*scrapeLoop)
sl.processScrapeResult(test.scrapedSamples, test.scrapeError, time.Unix(0, 0)) num, err := sl.append(test.scrapedSamples)
sl.report(time.Unix(0, 0), 42*time.Second, len(test.scrapedSamples), num, err)
reportedSamples := ingestedSamples.buffer
if err == nil {
reportedSamples = reportedSamples[num:]
}
// Ignore value of scrape_duration_seconds, as it's time dependant. if !reflect.DeepEqual(reportedSamples, test.expectedReportedSamples) {
reportedSamples.buffer[1].Value = 0
if !reflect.DeepEqual(reportedSamples.buffer, test.expectedReportedSamples) {
t.Errorf("Reported samples did not match expected metrics for case %d", i) t.Errorf("Reported samples did not match expected metrics for case %d", i)
t.Errorf("Expected: %v", test.expectedReportedSamples) t.Errorf("Expected: %v", test.expectedReportedSamples)
t.Fatalf("Got: %v", reportedSamples.buffer) t.Fatalf("Got: %v", reportedSamples)
} }
if test.expectedIngestedSamplesCount != len(ingestedSamples.buffer) { if test.expectedPostRelabelSamplesCount != num {
t.Fatalf("Ingested samples %d did not match expected value %d", len(ingestedSamples.buffer), test.expectedIngestedSamplesCount) t.Fatalf("Case %d: Ingested samples %d did not match expected value %d", i, num, test.expectedPostRelabelSamplesCount)
} }
} }
@ -478,10 +464,10 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
func TestScrapeLoopStop(t *testing.T) { func TestScrapeLoopStop(t *testing.T) {
scraper := &testScraper{} scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, 0) sl := newScrapeLoop(context.Background(), scraper, nil, nil, &config.ScrapeConfig{})
// The scrape pool synchronizes on stopping scrape loops. However, new scrape // The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are syarted asynchronously. Thus it's possible, that a loop is stopped // loops are started asynchronously. Thus it's possible, that a loop is stopped
// again before having started properly. // again before having started properly.
// Stopping not-yet-started loops must block until the run method was called and exited. // Stopping not-yet-started loops must block until the run method was called and exited.
// The run method must exit immediately. // The run method must exit immediately.
@ -528,15 +514,13 @@ func TestScrapeLoopRun(t *testing.T) {
signal = make(chan struct{}) signal = make(chan struct{})
errc = make(chan error) errc = make(chan error)
scraper = &testScraper{} scraper = &testScraper{}
app = &nopAppender{} app = &nopAppender{}
mut = func(storage.SampleAppender) storage.SampleAppender { return &nopAppender{} }
reportApp = &nopAppender{}
) )
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, mut, reportApp, 0) sl := newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{})
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
// is canceled. // is canceled.
@ -574,7 +558,7 @@ func TestScrapeLoopRun(t *testing.T) {
} }
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, mut, reportApp, 0) sl = newScrapeLoop(ctx, scraper, app, nil, &config.ScrapeConfig{})
go func() { go func() {
sl.run(time.Second, 100*time.Millisecond, errc) sl.run(time.Second, 100*time.Millisecond, errc)

View file

@ -278,9 +278,8 @@ func (app relabelAppender) Append(s *model.Sample) error {
return app.SampleAppender.Append(s) return app.SampleAppender.Append(s)
} }
// Appends samples to the given buffer. // bufferAppender appends samples to the given buffer.
type bufferAppender struct { type bufferAppender struct {
storage.SampleAppender
buffer model.Samples buffer model.Samples
} }
@ -289,6 +288,19 @@ func (app *bufferAppender) Append(s *model.Sample) error {
return nil return nil
} }
func (app *bufferAppender) NeedsThrottling() bool { return false }
// countingAppender counts the samples appended to the underlying appender.
type countingAppender struct {
storage.SampleAppender
count int
}
func (app *countingAppender) Append(s *model.Sample) error {
app.count++
return app.SampleAppender.Append(s)
}
// populateLabels builds a label set from the given label set and scrape configuration. // populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value. // It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling. // Returns a nil label set if the target is dropped during relabeling.