mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-12 16:44:05 -08:00
Merge pull request #2787 from prometheus/limit2
Rework sample limit to work for 2.0
This commit is contained in:
commit
eb651233ac
|
@ -602,10 +602,10 @@ mainLoop:
|
||||||
// A failed scrape is the same as an empty scrape,
|
// A failed scrape is the same as an empty scrape,
|
||||||
// we still call sl.append to trigger stale markers.
|
// we still call sl.append to trigger stale markers.
|
||||||
if total, added, err = sl.append(b, start); err != nil {
|
if total, added, err = sl.append(b, start); err != nil {
|
||||||
sl.l.With("err", err).Error("append failed")
|
sl.l.With("err", err).Warn("append failed")
|
||||||
// The append failed, probably due to a parse error.
|
// The append failed, probably due to a parse error or sample limit.
|
||||||
// Call sl.append again with an empty scrape to trigger stale markers.
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
||||||
if _, _, err = sl.append([]byte{}, start); err != nil {
|
if _, _, err := sl.append([]byte{}, start); err != nil {
|
||||||
sl.l.With("err", err).Error("append failed")
|
sl.l.With("err", err).Error("append failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -712,6 +712,7 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro
|
||||||
numOutOfOrder = 0
|
numOutOfOrder = 0
|
||||||
numDuplicates = 0
|
numDuplicates = 0
|
||||||
)
|
)
|
||||||
|
var sampleLimitErr error
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
|
@ -743,6 +744,12 @@ loop:
|
||||||
numDuplicates++
|
numDuplicates++
|
||||||
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
||||||
continue
|
continue
|
||||||
|
case errSampleLimit:
|
||||||
|
// Keep on parsing output if we hit the limit, so we report the correct
|
||||||
|
// total number of samples scraped.
|
||||||
|
sampleLimitErr = err
|
||||||
|
added++
|
||||||
|
continue
|
||||||
default:
|
default:
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
@ -769,6 +776,10 @@ loop:
|
||||||
numDuplicates++
|
numDuplicates++
|
||||||
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
||||||
continue
|
continue
|
||||||
|
case errSampleLimit:
|
||||||
|
sampleLimitErr = err
|
||||||
|
added++
|
||||||
|
continue
|
||||||
default:
|
default:
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
@ -785,6 +796,10 @@ loop:
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = p.Err()
|
err = p.Err()
|
||||||
}
|
}
|
||||||
|
if err == nil && sampleLimitErr != nil {
|
||||||
|
targetScrapeSampleLimit.Inc()
|
||||||
|
err = sampleLimitErr
|
||||||
|
}
|
||||||
if numOutOfOrder > 0 {
|
if numOutOfOrder > 0 {
|
||||||
sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
|
sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
|
||||||
}
|
}
|
||||||
|
@ -808,10 +823,10 @@ loop:
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
app.Rollback()
|
app.Rollback()
|
||||||
return total, 0, err
|
return total, added, err
|
||||||
}
|
}
|
||||||
if err := app.Commit(); err != nil {
|
if err := app.Commit(); err != nil {
|
||||||
return total, 0, err
|
return total, added, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sl.cache.iterDone()
|
sl.cache.iterDone()
|
||||||
|
|
|
@ -711,6 +711,116 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) {
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
appender func() storage.Appender
|
||||||
|
up float64
|
||||||
|
scrapeSamplesScraped float64
|
||||||
|
scrapeSamplesScrapedPostMetricRelabelling float64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
appender: func() storage.Appender { return nopAppender{} },
|
||||||
|
up: 1,
|
||||||
|
scrapeSamplesScraped: 3,
|
||||||
|
scrapeSamplesScrapedPostMetricRelabelling: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
appender: func() storage.Appender {
|
||||||
|
return &limitAppender{Appender: nopAppender{}, limit: 3}
|
||||||
|
},
|
||||||
|
up: 1,
|
||||||
|
scrapeSamplesScraped: 3,
|
||||||
|
scrapeSamplesScrapedPostMetricRelabelling: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
appender: func() storage.Appender {
|
||||||
|
return &limitAppender{Appender: nopAppender{}, limit: 2}
|
||||||
|
},
|
||||||
|
up: 0,
|
||||||
|
scrapeSamplesScraped: 3,
|
||||||
|
scrapeSamplesScrapedPostMetricRelabelling: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
appender: func() storage.Appender {
|
||||||
|
return &relabelAppender{
|
||||||
|
Appender: &limitAppender{Appender: nopAppender{}, limit: 2},
|
||||||
|
relabelings: []*config.RelabelConfig{
|
||||||
|
&config.RelabelConfig{
|
||||||
|
SourceLabels: model.LabelNames{"__name__"},
|
||||||
|
Regex: config.MustNewRegexp("a"),
|
||||||
|
Action: config.RelabelDrop,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
up: 1,
|
||||||
|
scrapeSamplesScraped: 3,
|
||||||
|
scrapeSamplesScrapedPostMetricRelabelling: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, c := range cases {
|
||||||
|
reportAppender := &collectResultAppender{}
|
||||||
|
var (
|
||||||
|
signal = make(chan struct{})
|
||||||
|
scraper = &testScraper{}
|
||||||
|
numScrapes = 0
|
||||||
|
reportApp = func() storage.Appender {
|
||||||
|
// Get result of the 2nd scrape.
|
||||||
|
if numScrapes == 2 {
|
||||||
|
return reportAppender
|
||||||
|
} else {
|
||||||
|
return nopAppender{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
defer close(signal)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil)
|
||||||
|
|
||||||
|
// Setup a series to be stale, then 3 samples, then stop.
|
||||||
|
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
||||||
|
numScrapes += 1
|
||||||
|
if numScrapes == 1 {
|
||||||
|
w.Write([]byte("stale 0\n"))
|
||||||
|
return nil
|
||||||
|
} else if numScrapes == 2 {
|
||||||
|
w.Write([]byte("a 0\nb 0\nc 0 \n"))
|
||||||
|
return nil
|
||||||
|
} else if numScrapes == 3 {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Scrape failed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
sl.run(10*time.Millisecond, time.Hour, nil)
|
||||||
|
signal <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-signal:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("Scrape wasn't stopped.")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reportAppender.result) != 4 {
|
||||||
|
t.Fatalf("Case %d appended report samples not as expected. Wanted: %d samples Got: %d", i, 4, len(reportAppender.result))
|
||||||
|
}
|
||||||
|
if reportAppender.result[0].v != c.up {
|
||||||
|
t.Fatalf("Case %d appended up sample not as expected. Wanted: %f Got: %+v", i, c.up, reportAppender.result[0])
|
||||||
|
}
|
||||||
|
if reportAppender.result[2].v != c.scrapeSamplesScraped {
|
||||||
|
t.Fatalf("Case %d appended scrape_samples_scraped sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScraped, reportAppender.result[2])
|
||||||
|
}
|
||||||
|
if reportAppender.result[3].v != c.scrapeSamplesScrapedPostMetricRelabelling {
|
||||||
|
t.Fatalf("Case %d appended scrape_samples_scraped_post_metric_relabeling sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScrapedPostMetricRelabelling, reportAppender.result[3])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type errorAppender struct {
|
type errorAppender struct {
|
||||||
collectResultAppender
|
collectResultAppender
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/relabel"
|
"github.com/prometheus/prometheus/pkg/relabel"
|
||||||
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/util/httputil"
|
"github.com/prometheus/prometheus/util/httputil"
|
||||||
)
|
)
|
||||||
|
@ -228,6 +229,8 @@ func (ts Targets) Len() int { return len(ts) }
|
||||||
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
|
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
|
||||||
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
|
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
|
||||||
|
|
||||||
|
var errSampleLimit = errors.New("sample limit exceeded")
|
||||||
|
|
||||||
// limitAppender limits the number of total appended samples in a batch.
|
// limitAppender limits the number of total appended samples in a batch.
|
||||||
type limitAppender struct {
|
type limitAppender struct {
|
||||||
storage.Appender
|
storage.Appender
|
||||||
|
@ -237,26 +240,29 @@ type limitAppender struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
if app.i+1 > app.limit {
|
if !value.IsStaleNaN(v) {
|
||||||
return "", fmt.Errorf("sample limit of %d exceeded", app.limit)
|
app.i++
|
||||||
|
if app.i > app.limit {
|
||||||
|
return "", errSampleLimit
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ref, err := app.Appender.Add(lset, t, v)
|
ref, err := app.Appender.Add(lset, t, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
app.i++
|
|
||||||
return ref, nil
|
return ref, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *limitAppender) AddFast(ref string, t int64, v float64) error {
|
func (app *limitAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
if app.i+1 > app.limit {
|
if !value.IsStaleNaN(v) {
|
||||||
return fmt.Errorf("sample limit of %d exceeded", app.limit)
|
app.i++
|
||||||
|
if app.i > app.limit {
|
||||||
|
return errSampleLimit
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := app.Appender.AddFast(ref, t, v); err != nil {
|
if err := app.Appender.AddFast(ref, t, v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
app.i++
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue