diff --git a/config/config.go b/config/config.go index c544d7e74..7fa03a144 100644 --- a/config/config.go +++ b/config/config.go @@ -610,9 +610,12 @@ type ScrapeConfig struct { // More than this label value length post metric-relabeling will cause the // scrape to fail. 0 means no limit. LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"` - // More than this many buckets in a native histogram will cause the scrape to - // fail. + // If there are more than this many buckets in a native histogram, + // buckets will be merged to stay within the limit. NativeHistogramBucketLimit uint `yaml:"native_histogram_bucket_limit,omitempty"` + // If the growth factor of one bucket to the next is smaller than this, + // buckets will be merged to increase the factor sufficiently. + NativeHistogramMinBucketFactor float64 `yaml:"native_histogram_min_bucket_factor,omitempty"` // Keep no more than this many dropped targets per job. // 0 means no limit. KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"` diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 3a7b71a2f..e62d61b09 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -451,6 +451,46 @@ metric_relabel_configs: # native histogram. If this is exceeded, the entire scrape will be treated as # failed. 0 means no limit. [ native_histogram_bucket_limit: | default = 0 ] + +# Lower limit for the growth factor of one bucket to the next in each native +# histogram. The resolution of a histogram with a lower growth factor will be +# reduced until it is within the limit. +# To set an upper limit for the schema (equivalent to "scale" in OTel's +# exponential histograms), use the following factor limits: +# +# +----------------------------+----------------------------+ +# | growth factor | resulting schema AKA scale | +# +----------------------------+----------------------------+ +# | 65536 | -4 | +# +----------------------------+----------------------------+ +# | 256 | -3 | +# +----------------------------+----------------------------+ +# | 16 | -2 | +# +----------------------------+----------------------------+ +# | 4 | -1 | +# +----------------------------+----------------------------+ +# | 2 | 0 | +# +----------------------------+----------------------------+ +# | 1.4 | 1 | +# +----------------------------+----------------------------+ +# | 1.1 | 2 | +# +----------------------------+----------------------------+ +# | 1.09 | 3 | +# +----------------------------+----------------------------+ +# | 1.04 | 4 | +# +----------------------------+----------------------------+ +# | 1.02 | 5 | +# +----------------------------+----------------------------+ +# | 1.01 | 6 | +# +----------------------------+----------------------------+ +# | 1.005 | 7 | +# +----------------------------+----------------------------+ +# | 1.002 | 8 | +# +----------------------------+----------------------------+ +# +# 0 results in the smallest supported factor (which is currently ~1.0027 or +# schema 8, but might change in the future). +[ native_histogram_min_bucket_factor: | default = 0 ] ``` Where `` must be unique across all scrape configurations. diff --git a/scrape/scrape.go b/scrape/scrape.go index 2518912a7..dfa945852 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -99,6 +99,7 @@ type scrapeLoopOptions struct { scraper scraper sampleLimit int bucketLimit int + maxSchema int32 labelLimits *labelLimits honorLabels bool honorTimestamps bool @@ -165,6 +166,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.enableCompression, opts.sampleLimit, opts.bucketLimit, + opts.maxSchema, opts.labelLimits, opts.interval, opts.timeout, @@ -270,6 +272,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { bodySizeLimit = int64(sp.config.BodySizeLimit) sampleLimit = int(sp.config.SampleLimit) bucketLimit = int(sp.config.NativeHistogramBucketLimit) + maxSchema = pickSchema(sp.config.NativeHistogramMinBucketFactor) labelLimits = &labelLimits{ labelLimit: int(sp.config.LabelLimit), labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), @@ -310,6 +313,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { scraper: s, sampleLimit: sampleLimit, bucketLimit: bucketLimit, + maxSchema: maxSchema, labelLimits: labelLimits, honorLabels: honorLabels, honorTimestamps: honorTimestamps, @@ -613,7 +617,7 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels } // appender returns an appender for ingested samples from the target. -func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Appender { +func appender(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender { app = &timeLimitAppender{ Appender: app, maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), @@ -633,6 +637,14 @@ func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Append limit: bucketLimit, } } + + if maxSchema < nativeHistogramMaxSchema { + app = &maxSchemaAppender{ + Appender: app, + maxSchema: maxSchema, + } + } + return app } @@ -786,6 +798,7 @@ type scrapeLoop struct { forcedErrMtx sync.Mutex sampleLimit int bucketLimit int + maxSchema int32 labelLimits *labelLimits interval time.Duration timeout time.Duration @@ -1078,6 +1091,7 @@ func newScrapeLoop(ctx context.Context, enableCompression bool, sampleLimit int, bucketLimit int, + maxSchema int32, labelLimits *labelLimits, interval time.Duration, timeout time.Duration, @@ -1128,6 +1142,7 @@ func newScrapeLoop(ctx context.Context, enableCompression: enableCompression, sampleLimit: sampleLimit, bucketLimit: bucketLimit, + maxSchema: maxSchema, labelLimits: labelLimits, interval: interval, timeout: timeout, @@ -1458,7 +1473,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, } // Take an appender with limits. - app = appender(app, sl.sampleLimit, sl.bucketLimit) + app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema) defer func() { if err != nil { @@ -1906,3 +1921,18 @@ func TargetFromContext(ctx context.Context) (*Target, bool) { t, ok := ctx.Value(ctxKeyTarget).(*Target) return t, ok } + +func pickSchema(bucketFactor float64) int32 { + if bucketFactor <= 1 { + bucketFactor = 1.00271 + } + floor := math.Floor(-math.Log2(math.Log2(bucketFactor))) + switch { + case floor >= float64(nativeHistogramMaxSchema): + return nativeHistogramMaxSchema + case floor <= float64(nativeHistogramMinSchema): + return nativeHistogramMinSchema + default: + return int32(floor) + } +} diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 1a416eeb6..b4bfdc6e4 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -513,7 +513,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok := loop.(*scrapeLoop) require.True(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped := appender(appl.appender(context.Background()), 0, 0) + wrapped := appender(appl.appender(context.Background()), 0, 0, nativeHistogramMaxSchema) tl, ok := wrapped.(*timeLimitAppender) require.True(t, ok, "Expected timeLimitAppender but got %T", wrapped) @@ -529,7 +529,7 @@ func TestScrapePoolAppender(t *testing.T) { appl, ok = loop.(*scrapeLoop) require.True(t, ok, "Expected scrapeLoop but got %T", loop) - wrapped = appender(appl.appender(context.Background()), sampleLimit, 0) + wrapped = appender(appl.appender(context.Background()), sampleLimit, 0, nativeHistogramMaxSchema) sl, ok := wrapped.(*limitAppender) require.True(t, ok, "Expected limitAppender but got %T", wrapped) @@ -540,7 +540,7 @@ func TestScrapePoolAppender(t *testing.T) { _, ok = tl.Appender.(nopAppender) require.True(t, ok, "Expected base appender but got %T", tl.Appender) - wrapped = appender(appl.appender(context.Background()), sampleLimit, 100) + wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, nativeHistogramMaxSchema) bl, ok := wrapped.(*bucketLimitAppender) require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped) @@ -553,6 +553,23 @@ func TestScrapePoolAppender(t *testing.T) { _, ok = tl.Appender.(nopAppender) require.True(t, ok, "Expected base appender but got %T", tl.Appender) + + wrapped = appender(appl.appender(context.Background()), sampleLimit, 100, 0) + + ml, ok := wrapped.(*maxSchemaAppender) + require.True(t, ok, "Expected maxSchemaAppender but got %T", wrapped) + + bl, ok = ml.Appender.(*bucketLimitAppender) + require.True(t, ok, "Expected bucketLimitAppender but got %T", wrapped) + + sl, ok = bl.Appender.(*limitAppender) + require.True(t, ok, "Expected limitAppender but got %T", bl) + + tl, ok = sl.Appender.(*timeLimitAppender) + require.True(t, ok, "Expected timeLimitAppender but got %T", sl.Appender) + + _, ok = tl.Appender.(nopAppender) + require.True(t, ok, "Expected base appender but got %T", tl.Appender) } func TestScrapePoolRaces(t *testing.T) { @@ -653,7 +670,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app true, false, true, - 0, 0, + 0, 0, nativeHistogramMaxSchema, nil, interval, time.Hour, @@ -796,7 +813,7 @@ func TestScrapeLoopRun(t *testing.T) { true, false, true, - 0, 0, + 0, 0, nativeHistogramMaxSchema, nil, time.Second, time.Hour, @@ -942,7 +959,7 @@ func TestScrapeLoopMetadata(t *testing.T) { true, false, true, - 0, 0, + 0, 0, nativeHistogramMaxSchema, nil, 0, 0, @@ -3465,3 +3482,73 @@ func TestScrapeLoopCompression(t *testing.T) { }) } } + +func TestPickSchema(t *testing.T) { + tcs := []struct { + factor float64 + schema int32 + }{ + { + factor: 65536, + schema: -4, + }, + { + factor: 256, + schema: -3, + }, + { + factor: 16, + schema: -2, + }, + { + factor: 4, + schema: -1, + }, + { + factor: 2, + schema: 0, + }, + { + factor: 1.4, + schema: 1, + }, + { + factor: 1.1, + schema: 2, + }, + { + factor: 1.09, + schema: 3, + }, + { + factor: 1.04, + schema: 4, + }, + { + factor: 1.02, + schema: 5, + }, + { + factor: 1.01, + schema: 6, + }, + { + factor: 1.005, + schema: 7, + }, + { + factor: 1.002, + schema: 8, + }, + // The default value of native_histogram_min_bucket_factor + { + factor: 0, + schema: 8, + }, + } + + for _, tc := range tcs { + schema := pickSchema(tc.factor) + require.Equal(t, tc.schema, schema) + } +} diff --git a/scrape/target.go b/scrape/target.go index 0605f5349..c9287f818 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -387,6 +387,35 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe return ref, nil } +const ( + nativeHistogramMaxSchema int32 = 8 + nativeHistogramMinSchema int32 = -4 +) + +type maxSchemaAppender struct { + storage.Appender + + maxSchema int32 +} + +func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if h != nil { + if h.Schema > app.maxSchema { + h = h.ReduceResolution(app.maxSchema) + } + } + if fh != nil { + if fh.Schema > app.maxSchema { + fh = fh.ReduceResolution(app.maxSchema) + } + } + ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) + if err != nil { + return 0, err + } + return ref, nil +} + // PopulateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling. diff --git a/scrape/target_test.go b/scrape/target_test.go index f37c75a76..dac502a80 100644 --- a/scrape/target_test.go +++ b/scrape/target_test.go @@ -590,3 +590,64 @@ func TestBucketLimitAppender(t *testing.T) { } } } + +func TestMaxSchemaAppender(t *testing.T) { + example := histogram.Histogram{ + Schema: 0, + Count: 21, + Sum: 33, + ZeroThreshold: 0.001, + ZeroCount: 3, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{3, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 3}, + }, + NegativeBuckets: []int64{3, 0, 0}, + } + + cases := []struct { + h histogram.Histogram + maxSchema int32 + expectSchema int32 + }{ + { + h: example, + maxSchema: -1, + expectSchema: -1, + }, + { + h: example, + maxSchema: 0, + expectSchema: 0, + }, + } + + resApp := &collectResultAppender{} + + for _, c := range cases { + for _, floatHisto := range []bool{true, false} { + t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { + app := &maxSchemaAppender{Appender: resApp, maxSchema: c.maxSchema} + ts := int64(10 * time.Minute / time.Millisecond) + lbls := labels.FromStrings("__name__", "sparse_histogram_series") + var err error + if floatHisto { + fh := c.h.Copy().ToFloat(nil) + _, err = app.AppendHistogram(0, lbls, ts, nil, fh) + require.Equal(t, c.expectSchema, fh.Schema) + require.NoError(t, err) + + } else { + h := c.h.Copy() + _, err = app.AppendHistogram(0, lbls, ts, h, nil) + require.Equal(t, c.expectSchema, h.Schema) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + }) + } + } +}