Implement bucket limit for native histograms

Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
Jeanette Tan 2023-04-22 03:14:19 +08:00
parent 8f1dc4a70f
commit 4d21ac23e6
5 changed files with 116 additions and 40 deletions

View file

@ -489,6 +489,9 @@ type ScrapeConfig struct {
// More than this label value length post metric-relabeling will cause the
// scrape to fail.
LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"`
// More than this many buckets in a native histogram will cause the scrape to
// fail.
NativeHistogramBucketLimit uint `yaml:"bucket_limit,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.

View file

@ -191,6 +191,12 @@ var (
},
[]string{"scrape_job"},
)
targetScrapeNativeHistogramBucketLimit = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrapes_histogram_exceeded_bucket_limit_total",
Help: "Total number of native histograms rejected due to exceeding the bucket limit.",
},
)
)
func init() {
@ -216,6 +222,7 @@ func init() {
targetScrapeExemplarOutOfOrder,
targetScrapePoolExceededLabelLimits,
targetSyncFailed,
targetScrapeNativeHistogramBucketLimit,
)
}
@ -256,6 +263,7 @@ type scrapeLoopOptions struct {
target *Target
scraper scraper
sampleLimit int
bucketLimit int
labelLimits *labelLimits
honorLabels bool
honorTimestamps bool
@ -319,6 +327,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
jitterSeed,
opts.honorTimestamps,
opts.sampleLimit,
opts.bucketLimit,
opts.labelLimits,
opts.interval,
opts.timeout,
@ -412,6 +421,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
@ -446,6 +456,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
@ -530,6 +541,7 @@ func (sp *scrapePool) sync(targets []*Target) {
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
@ -559,6 +571,7 @@ func (sp *scrapePool) sync(targets []*Target) {
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
@ -731,7 +744,7 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels
}
// appender returns an appender for ingested samples from the target.
func appender(app storage.Appender, limit int) storage.Appender {
func appender(app storage.Appender, limit, bucketLimit int) storage.Appender {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
@ -744,6 +757,13 @@ func appender(app storage.Appender, limit int) storage.Appender {
limit: limit,
}
}
if bucketLimit > 0 {
app = &bucketLimitAppender{
Appender: app,
limit: bucketLimit,
}
}
return app
}
@ -872,6 +892,7 @@ type scrapeLoop struct {
forcedErr error
forcedErrMtx sync.Mutex
sampleLimit int
bucketLimit int
labelLimits *labelLimits
interval time.Duration
timeout time.Duration
@ -1152,6 +1173,7 @@ func newScrapeLoop(ctx context.Context,
jitterSeed uint64,
honorTimestamps bool,
sampleLimit int,
bucketLimit int,
labelLimits *labelLimits,
interval time.Duration,
timeout time.Duration,
@ -1195,6 +1217,7 @@ func newScrapeLoop(ctx context.Context,
appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
@ -1462,10 +1485,11 @@ func (sl *scrapeLoop) getCache() *scrapeCache {
}
type appendErrors struct {
numOutOfOrder int
numDuplicates int
numOutOfBounds int
numExemplarOutOfOrder int
numOutOfOrder int
numDuplicates int
numOutOfBounds int
numExemplarOutOfOrder int
numHistogramBucketLimit int
}
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
@ -1510,7 +1534,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
}
// Take an appender with limits.
app = appender(app, sl.sampleLimit)
app = appender(app, sl.sampleLimit, sl.bucketLimit)
defer func() {
if err != nil {
@ -1693,6 +1717,9 @@ loop:
if appErrs.numExemplarOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
}
if appErrs.numHistogramBucketLimit > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting native histograms that exceeded bucket limit", "num_dropped", appErrs.numHistogramBucketLimit)
}
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
@ -1735,6 +1762,11 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
return false, nil
case storage.ErrHistogramBucketLimit:
appErrs.numHistogramBucketLimit++
level.Debug(sl.l).Log("msg", "Exceeded bucket limit for native histograms", "series", string(met))
targetScrapeNativeHistogramBucketLimit.Inc()
return false, nil
case errSampleLimit:
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.

View file

@ -487,7 +487,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok := loop.(*scrapeLoop)
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped := appender(appl.appender(context.Background()), 0)
wrapped := appender(appl.appender(context.Background()), 0, 0)
tl, ok := wrapped.(*timeLimitAppender)
require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped)
@ -503,7 +503,7 @@ func TestScrapePoolAppender(t *testing.T) {
appl, ok = loop.(*scrapeLoop)
require.True(t, ok, "Expected scrapeLoop but got %T", loop)
wrapped = appender(appl.appender(context.Background()), sampleLimit)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 0)
sl, ok := wrapped.(*limitAppender)
require.True(t, ok, "Expected limitAppender but got %T", wrapped)
@ -513,6 +513,20 @@ func TestScrapePoolAppender(t *testing.T) {
_, ok = tl.Appender.(nopAppender)
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
wrapped = appender(appl.appender(context.Background()), sampleLimit, 100)
bl, ok := wrapped.(*bucketLimitAppender)
require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped)
sl, ok = bl.Appender.(*limitAppender)
require.True(t, ok, "Expected limitAppender but got %T", bl)
tl, ok = sl.Appender.(*timeLimitAppender)
require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender)
_, ok = tl.Appender.(nopAppender)
require.True(t, ok, "Expected base appender but got %T", tl.Appender)
}
func TestScrapePoolRaces(t *testing.T) {
@ -610,7 +624,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
nopMutator,
nil, nil, 0,
true,
0,
0, 0,
nil,
1,
0,
@ -682,7 +696,7 @@ func TestScrapeLoopStop(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -758,7 +772,7 @@ func TestScrapeLoopRun(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
time.Second,
time.Hour,
@ -813,7 +827,7 @@ func TestScrapeLoopRun(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
time.Second,
100*time.Millisecond,
@ -872,7 +886,7 @@ func TestScrapeLoopForcedErr(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
time.Second,
time.Hour,
@ -930,7 +944,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
cache,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -987,7 +1001,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1047,7 +1061,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1125,7 +1139,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1188,7 +1202,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1254,7 +1268,7 @@ func TestScrapeLoopCache(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1337,7 +1351,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -1451,7 +1465,7 @@ func TestScrapeLoopAppend(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1546,7 +1560,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) {
return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil)
},
nil,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, false, nil, false,
func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, 0, nil, 0, 0, false, false, nil, false,
)
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC))
@ -1577,7 +1591,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1635,7 +1649,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
nil,
0,
true,
app.limit,
app.limit, 0,
nil,
0,
0,
@ -1712,7 +1726,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1760,7 +1774,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1811,7 +1825,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1922,7 +1936,7 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -1987,7 +2001,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2039,7 +2053,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -2075,7 +2089,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -2124,7 +2138,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2169,7 +2183,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2441,7 +2455,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
func(ctx context.Context) storage.Appender { return capp },
nil, 0,
true,
0,
0, 0,
nil,
0,
0,
@ -2482,7 +2496,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
func(ctx context.Context) storage.Appender { return capp },
nil, 0,
false,
0,
0, 0,
nil,
0,
0,
@ -2522,7 +2536,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2580,7 +2594,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2843,7 +2857,7 @@ func TestScrapeAddFast(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
0,
0,
@ -2929,7 +2943,7 @@ func TestScrapeReportSingleAppender(t *testing.T) {
nil,
0,
true,
0,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
@ -3131,7 +3145,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
nil,
0,
true,
0,
0, 0,
&test.labelLimits,
0,
0,

View file

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/textparse"
@ -355,6 +356,31 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels,
return ref, nil
}
// bucketLimitAppender limits the number of total appended samples in a batch.
type bucketLimitAppender struct {
storage.Appender
limit int
}
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
return 0, storage.ErrHistogramBucketLimit
}
}
if fh != nil {
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
return 0, storage.ErrHistogramBucketLimit
}
}
ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh)
if err != nil {
return 0, err
}
return ref, nil
}
// PopulateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.

View file

@ -46,6 +46,7 @@ var (
ErrHistogramNegativeBucketCount = errors.New("histogram has a bucket whose observation count is negative")
ErrHistogramSpanNegativeOffset = errors.New("histogram has a span whose offset is negative")
ErrHistogramSpansBucketsMismatch = errors.New("histogram spans specify different number of buckets than provided")
ErrHistogramBucketLimit = errors.New("histogram bucket limit exceeded")
)
// SeriesRef is a generic series reference. In prometheus it is either a