diff --git a/config/config.go b/config/config.go index 5c51d5a0d..31ebb288e 100644 --- a/config/config.go +++ b/config/config.go @@ -489,6 +489,9 @@ type ScrapeConfig struct { // More than this label value length post metric-relabeling will cause the // scrape to fail. LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"` + // More than this many buckets in a native histogram will cause the scrape to + // fail. + NativeHistogramBucketLimit uint `yaml:"bucket_limit,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/scrape/scrape.go b/scrape/scrape.go index 5c649e729..7c24c1070 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -191,6 +191,12 @@ var ( }, []string{"scrape_job"}, ) + targetScrapeNativeHistogramBucketLimit = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_target_scrapes_histogram_exceeded_bucket_limit_total", + Help: "Total number of native histograms rejected due to exceeding the bucket limit.", + }, + ) ) func init() { @@ -216,6 +222,7 @@ func init() { targetScrapeExemplarOutOfOrder, targetScrapePoolExceededLabelLimits, targetSyncFailed, + targetScrapeNativeHistogramBucketLimit, ) } @@ -256,6 +263,7 @@ type scrapeLoopOptions struct { target *Target scraper scraper sampleLimit int + bucketLimit int labelLimits *labelLimits honorLabels bool honorTimestamps bool @@ -319,6 +327,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed jitterSeed, opts.honorTimestamps, opts.sampleLimit, + opts.bucketLimit, opts.labelLimits, opts.interval, opts.timeout, @@ -412,6 +421,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { timeout = time.Duration(sp.config.ScrapeTimeout) bodySizeLimit = int64(sp.config.BodySizeLimit) sampleLimit = int(sp.config.SampleLimit) + bucketLimit = int(sp.config.NativeHistogramBucketLimit) labelLimits = &labelLimits{ labelLimit: int(sp.config.LabelLimit), labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), @@ -446,6 +456,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { target: t, scraper: s, sampleLimit: sampleLimit, + bucketLimit: bucketLimit, labelLimits: labelLimits, honorLabels: honorLabels, honorTimestamps: honorTimestamps, @@ -530,6 +541,7 @@ func (sp *scrapePool) sync(targets []*Target) { timeout = time.Duration(sp.config.ScrapeTimeout) bodySizeLimit = int64(sp.config.BodySizeLimit) sampleLimit = int(sp.config.SampleLimit) + bucketLimit = int(sp.config.NativeHistogramBucketLimit) labelLimits = &labelLimits{ labelLimit: int(sp.config.LabelLimit), labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), @@ -559,6 +571,7 @@ func (sp *scrapePool) sync(targets []*Target) { target: t, scraper: s, sampleLimit: sampleLimit, + bucketLimit: bucketLimit, labelLimits: labelLimits, honorLabels: honorLabels, honorTimestamps: honorTimestamps, @@ -731,7 +744,7 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels } // appender returns an appender for ingested samples from the target. -func appender(app storage.Appender, limit int) storage.Appender { +func appender(app storage.Appender, limit, bucketLimit int) storage.Appender { app = &timeLimitAppender{ Appender: app, maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), @@ -744,6 +757,13 @@ func appender(app storage.Appender, limit int) storage.Appender { limit: limit, } } + + if bucketLimit > 0 { + app = &bucketLimitAppender{ + Appender: app, + limit: bucketLimit, + } + } return app } @@ -872,6 +892,7 @@ type scrapeLoop struct { forcedErr error forcedErrMtx sync.Mutex sampleLimit int + bucketLimit int labelLimits *labelLimits interval time.Duration timeout time.Duration @@ -1152,6 +1173,7 @@ func newScrapeLoop(ctx context.Context, jitterSeed uint64, honorTimestamps bool, sampleLimit int, + bucketLimit int, labelLimits *labelLimits, interval time.Duration, timeout time.Duration, @@ -1195,6 +1217,7 @@ func newScrapeLoop(ctx context.Context, appenderCtx: appenderCtx, honorTimestamps: honorTimestamps, sampleLimit: sampleLimit, + bucketLimit: bucketLimit, labelLimits: labelLimits, interval: interval, timeout: timeout, @@ -1462,10 +1485,11 @@ func (sl *scrapeLoop) getCache() *scrapeCache { } type appendErrors struct { - numOutOfOrder int - numDuplicates int - numOutOfBounds int - numExemplarOutOfOrder int + numOutOfOrder int + numDuplicates int + numOutOfBounds int + numExemplarOutOfOrder int + numHistogramBucketLimit int } func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { @@ -1510,7 +1534,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, } // Take an appender with limits. - app = appender(app, sl.sampleLimit) + app = appender(app, sl.sampleLimit, sl.bucketLimit) defer func() { if err != nil { @@ -1693,6 +1717,9 @@ loop: if appErrs.numExemplarOutOfOrder > 0 { level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder) } + if appErrs.numHistogramBucketLimit > 0 { + level.Warn(sl.l).Log("msg", "Error on ingesting native histograms that exceeded bucket limit", "num_dropped", appErrs.numHistogramBucketLimit) + } if err == nil { sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. @@ -1735,6 +1762,11 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met)) targetScrapeSampleOutOfBounds.Inc() return false, nil + case storage.ErrHistogramBucketLimit: + appErrs.numHistogramBucketLimit++ + level.Debug(sl.l).Log("msg", "Exceeded bucket limit for native histograms", "series", string(met)) + targetScrapeNativeHistogramBucketLimit.Inc() + return false, nil case errSampleLimit: // Keep on parsing output if we hit the limit, so we report the correct // total number of samples scraped. diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 2a2ca0948..c815bf5c6 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -487,7 +487,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok := loop.(*scrapeLoop) require.True(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped := appender(appl.appender(context.Background()), 0) + wrapped := appender(appl.appender(context.Background()), 0, 0) tl, ok := wrapped.(*timeLimitAppender) require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped) @@ -503,7 +503,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok = loop.(*scrapeLoop) require.True(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped = appender(appl.appender(context.Background()), sampleLimit) + wrapped = appender(appl.appender(context.Background()), sampleLimit, 0) sl, ok := wrapped.(*limitAppender) require.True(t, ok, "Expected limitAppender but got %T", wrapped) @@ -513,6 +513,20 @@ func TestScrapePoolAppender(t *testing.T) { _, ok = tl.Appender.(nopAppender) require.True(t, ok, "Expected base appender but got %T", tl.Appender) + + wrapped = appender(appl.appender(context.Background()), sampleLimit, 100) + + bl, ok := wrapped.(*bucketLimitAppender) + require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped) + + sl, ok = bl.Appender.(*limitAppender) + require.True(t, ok, "Expected limitAppender but got %T", bl) + + tl, ok = sl.Appender.(*timeLimitAppender) + require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender) + + _, ok = tl.Appender.(nopAppender) + require.True(t, ok, "Expected base appender but got %T", tl.Appender) } func TestScrapePoolRaces(t *testing.T) { @@ -610,7 +624,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { nopMutator, nil, nil, 0, true, - 0, + 0, 0, nil, 1, 0, @@ -682,7 +696,7 @@ func TestScrapeLoopStop(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -758,7 +772,7 @@ func TestScrapeLoopRun(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, time.Second, time.Hour, @@ -813,7 +827,7 @@ func TestScrapeLoopRun(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, time.Second, 100*time.Millisecond, @@ -872,7 +886,7 @@ func TestScrapeLoopForcedErr(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, time.Second, time.Hour, @@ -930,7 +944,7 @@ func TestScrapeLoopMetadata(t *testing.T) { cache, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -987,7 +1001,7 @@ func simpleTestScrapeLoop(t testing.TB) (context.Context, *scrapeLoop) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1047,7 +1061,7 @@ func TestScrapeLoopFailWithInvalidLabelsAfterRelabel(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1125,7 +1139,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -1188,7 +1202,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -1254,7 +1268,7 @@ func TestScrapeLoopCache(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -1337,7 +1351,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -1451,7 +1465,7 @@ func TestScrapeLoopAppend(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1546,7 +1560,7 @@ func TestScrapeLoopAppendForConflictingPrefixedLabels(t *testing.T) { return mutateSampleLabels(l, &Target{labels: labels.FromStrings(tc.targetLabels...)}, false, nil) }, nil, - func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, nil, 0, 0, false, false, nil, false, + func(ctx context.Context) storage.Appender { return app }, nil, 0, true, 0, 0, nil, 0, 0, false, false, nil, false, ) slApp := sl.appender(context.Background()) _, _, _, err := sl.append(slApp, []byte(tc.exposedLabels), "", time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)) @@ -1577,7 +1591,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1635,7 +1649,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { nil, 0, true, - app.limit, + app.limit, 0, nil, 0, 0, @@ -1712,7 +1726,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1760,7 +1774,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1811,7 +1825,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1922,7 +1936,7 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000 nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -1987,7 +2001,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -2039,7 +2053,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -2075,7 +2089,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -2124,7 +2138,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -2169,7 +2183,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -2441,7 +2455,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { func(ctx context.Context) storage.Appender { return capp }, nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -2482,7 +2496,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { func(ctx context.Context) storage.Appender { return capp }, nil, 0, false, - 0, + 0, 0, nil, 0, 0, @@ -2522,7 +2536,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -2580,7 +2594,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -2843,7 +2857,7 @@ func TestScrapeAddFast(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 0, 0, @@ -2929,7 +2943,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { nil, 0, true, - 0, + 0, 0, nil, 10*time.Millisecond, time.Hour, @@ -3131,7 +3145,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) { nil, 0, true, - 0, + 0, 0, &test.labelLimits, 0, 0, diff --git a/scrape/target.go b/scrape/target.go index 6c4703118..f916e4549 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/textparse" @@ -355,6 +356,31 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, return ref, nil } +// bucketLimitAppender limits the number of total appended samples in a batch. +type bucketLimitAppender struct { + storage.Appender + + limit int +} + +func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if h != nil { + if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit { + return 0, storage.ErrHistogramBucketLimit + } + } + if fh != nil { + if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit { + return 0, storage.ErrHistogramBucketLimit + } + } + ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) + if err != nil { + return 0, err + } + return ref, nil +} + // PopulateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling. diff --git a/storage/interface.go b/storage/interface.go index b282f1fc6..18119f254 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -46,6 +46,7 @@ var ( ErrHistogramNegativeBucketCount = errors.New("histogram has a bucket whose observation count is negative") ErrHistogramSpanNegativeOffset = errors.New("histogram has a span whose offset is negative") ErrHistogramSpansBucketsMismatch = errors.New("histogram spans specify different number of buckets than provided") + ErrHistogramBucketLimit = errors.New("histogram bucket limit exceeded") ) // SeriesRef is a generic series reference. In prometheus it is either a