Treat bucket limit like sample limit and make it fail the whole scrape and return an error

Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
Jeanette Tan 2023-04-22 03:14:19 +08:00
parent 071426f72f
commit 2ad39baa72
6 changed files with 37 additions and 28 deletions

View file

@ -489,8 +489,8 @@ type ScrapeConfig struct {
// More than this label value length post metric-relabeling will cause the // More than this label value length post metric-relabeling will cause the
// scrape to fail. // scrape to fail.
LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"` LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"`
// More than this many buckets in a native histogram will cause the histogram // More than this many buckets in a native histogram will cause the scrape to
// to be ignored, but it will not make the whole scrape fail. // fail.
NativeHistogramBucketLimit uint `yaml:"bucket_limit,omitempty"` NativeHistogramBucketLimit uint `yaml:"bucket_limit,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse // We cannot do proper Go type embedding below as the parser will then parse

View file

@ -378,8 +378,8 @@ metric_relabel_configs:
[ target_limit: <int> | default = 0 ] [ target_limit: <int> | default = 0 ]
# Limit on total number of positive and negative buckets allowed in a native # Limit on total number of positive and negative buckets allowed in a native
# histogram. If this is exceeded, the histogram will be ignored, but this will # histogram. If this is exceeded, the entire scrape will be treated as failed.
# not make the scrape fail. 0 means no limit. # 0 means no limit.
[ sample_limit: <int> | default = 0 ] [ sample_limit: <int> | default = 0 ]
``` ```

View file

@ -194,7 +194,7 @@ var (
targetScrapeNativeHistogramBucketLimit = prometheus.NewCounter( targetScrapeNativeHistogramBucketLimit = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "prometheus_target_scrapes_histogram_exceeded_bucket_limit_total", Name: "prometheus_target_scrapes_histogram_exceeded_bucket_limit_total",
Help: "Total number of native histograms rejected due to exceeding the bucket limit.", Help: "Total number of scrapes that hit the native histograms bucket limit and were rejected.",
}, },
) )
) )
@ -1489,7 +1489,6 @@ type appendErrors struct {
numDuplicates int numDuplicates int
numOutOfBounds int numOutOfBounds int
numExemplarOutOfOrder int numExemplarOutOfOrder int
numHistogramBucketLimit int
} }
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
@ -1506,6 +1505,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
defTime = timestamp.FromTime(ts) defTime = timestamp.FromTime(ts)
appErrs = appendErrors{} appErrs = appendErrors{}
sampleLimitErr error sampleLimitErr error
bucketLimitErr error
e exemplar.Exemplar // escapes to heap so hoisted out of loop e exemplar.Exemplar // escapes to heap so hoisted out of loop
meta metadata.Metadata meta metadata.Metadata
metadataChanged bool metadataChanged bool
@ -1655,7 +1655,7 @@ loop:
} else { } else {
ref, err = app.Append(ref, lset, t, val) ref, err = app.Append(ref, lset, t, val)
} }
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &appErrs) sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil { if err != nil {
if err != storage.ErrNotFound { if err != storage.ErrNotFound {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
@ -1669,7 +1669,7 @@ loop:
sl.cache.trackStaleness(hash, lset) sl.cache.trackStaleness(hash, lset)
} }
sl.cache.addRef(met, ref, lset, hash) sl.cache.addRef(met, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil { if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++ seriesAdded++
} }
} }
@ -1705,6 +1705,13 @@ loop:
// We only want to increment this once per scrape, so this is Inc'd outside the loop. // We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeSampleLimit.Inc() targetScrapeSampleLimit.Inc()
} }
if bucketLimitErr != nil {
if err == nil {
err = bucketLimitErr // if sample limit is hit, that error takes precedence
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeNativeHistogramBucketLimit.Inc()
}
if appErrs.numOutOfOrder > 0 { if appErrs.numOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder) level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
} }
@ -1717,9 +1724,6 @@ loop:
if appErrs.numExemplarOutOfOrder > 0 { if appErrs.numExemplarOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
} }
if appErrs.numHistogramBucketLimit > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting native histograms that exceeded bucket limit", "num_dropped", appErrs.numHistogramBucketLimit)
}
if err == nil { if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool { sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale. // Series no longer exposed, mark it stale.
@ -1737,8 +1741,8 @@ loop:
} }
// Adds samples to the appender, checking the error, and then returns the # of samples added, // Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample limit errors. // whether the caller should continue to process more samples, and any sample or bucket limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error) { func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
switch errors.Cause(err) { switch errors.Cause(err) {
case nil: case nil:
if tp == nil && ce != nil { if tp == nil && ce != nil {
@ -1762,16 +1766,16 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc() targetScrapeSampleOutOfBounds.Inc()
return false, nil return false, nil
case storage.ErrHistogramBucketLimit:
appErrs.numHistogramBucketLimit++
level.Debug(sl.l).Log("msg", "Exceeded bucket limit for native histograms", "series", string(met))
targetScrapeNativeHistogramBucketLimit.Inc()
return false, nil
case errSampleLimit: case errSampleLimit:
// Keep on parsing output if we hit the limit, so we report the correct // Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped. // total number of samples scraped.
*sampleLimitErr = err *sampleLimitErr = err
return false, nil return false, nil
case errBucketLimit:
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
*bucketLimitErr = err
return false, nil
default: default:
return false, err return false, err
} }

View file

@ -1788,7 +1788,10 @@ func TestScrapeLoop_HistogramBucketLimit(t *testing.T) {
now = time.Now() now = time.Now()
total, added, seriesAdded, err = sl.append(app, msg, "application/vnd.google.protobuf", now) total, added, seriesAdded, err = sl.append(app, msg, "application/vnd.google.protobuf", now)
require.NoError(t, err) if err != errBucketLimit {
t.Fatalf("Did not see expected histogram bucket limit error: %s", err)
}
require.NoError(t, app.Rollback())
require.Equal(t, 1, total) require.Equal(t, 1, total)
require.Equal(t, 1, added) require.Equal(t, 1, added)
require.Equal(t, 0, seriesAdded) require.Equal(t, 0, seriesAdded)
@ -3010,7 +3013,7 @@ func TestReuseCacheRace(*testing.T) {
func TestCheckAddError(t *testing.T) { func TestCheckAddError(t *testing.T) {
var appErrs appendErrors var appErrs appendErrors
sl := scrapeLoop{l: log.NewNopLogger()} sl := scrapeLoop{l: log.NewNopLogger()}
sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, &appErrs) sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs)
require.Equal(t, 1, appErrs.numOutOfOrder) require.Equal(t, 1, appErrs.numOutOfOrder)
} }

View file

@ -314,7 +314,10 @@ func (ts Targets) Len() int { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() } func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
var errSampleLimit = errors.New("sample limit exceeded") var (
errSampleLimit = errors.New("sample limit exceeded")
errBucketLimit = errors.New("histogram bucket limit exceeded")
)
// limitAppender limits the number of total appended samples in a batch. // limitAppender limits the number of total appended samples in a batch.
type limitAppender struct { type limitAppender struct {
@ -366,12 +369,12 @@ type bucketLimitAppender struct {
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil { if h != nil {
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit { if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
return 0, storage.ErrHistogramBucketLimit return 0, errBucketLimit
} }
} }
if fh != nil { if fh != nil {
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit { if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
return 0, storage.ErrHistogramBucketLimit return 0, errBucketLimit
} }
} }
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)

View file

@ -46,7 +46,6 @@ var (
ErrHistogramNegativeBucketCount = errors.New("histogram has a bucket whose observation count is negative") ErrHistogramNegativeBucketCount = errors.New("histogram has a bucket whose observation count is negative")
ErrHistogramSpanNegativeOffset = errors.New("histogram has a span whose offset is negative") ErrHistogramSpanNegativeOffset = errors.New("histogram has a span whose offset is negative")
ErrHistogramSpansBucketsMismatch = errors.New("histogram spans specify different number of buckets than provided") ErrHistogramSpansBucketsMismatch = errors.New("histogram spans specify different number of buckets than provided")
ErrHistogramBucketLimit = errors.New("histogram bucket limit exceeded")
) )
// SeriesRef is a generic series reference. In prometheus it is either a // SeriesRef is a generic series reference. In prometheus it is either a