mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
fix according to code review
Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
This commit is contained in:
parent
4a5d8461ba
commit
9adc1699c3
|
@ -30,7 +30,7 @@ import (
|
||||||
type FloatHistogram struct {
|
type FloatHistogram struct {
|
||||||
// Counter reset information.
|
// Counter reset information.
|
||||||
CounterResetHint CounterResetHint
|
CounterResetHint CounterResetHint
|
||||||
// Currently valid schema numbers are -4 <= n <= 8 for exponential buckets,
|
// Currently valid schema numbers are -4 <= n <= 8 for exponential buckets.
|
||||||
// They are all for base-2 bucket schemas, where 1 is a bucket boundary in
|
// They are all for base-2 bucket schemas, where 1 is a bucket boundary in
|
||||||
// each case, and then each power of two is divided into 2^n logarithmic buckets.
|
// each case, and then each power of two is divided into 2^n logarithmic buckets.
|
||||||
// Or in other words, each bucket boundary is the previous boundary times
|
// Or in other words, each bucket boundary is the previous boundary times
|
||||||
|
@ -54,7 +54,7 @@ type FloatHistogram struct {
|
||||||
// This slice is interned, to be treated as immutable and copied by reference.
|
// This slice is interned, to be treated as immutable and copied by reference.
|
||||||
// These numbers should be strictly increasing. This field is only used when the
|
// These numbers should be strictly increasing. This field is only used when the
|
||||||
// schema is for custom buckets, and the ZeroThreshold, ZeroCount, NegativeSpans
|
// schema is for custom buckets, and the ZeroThreshold, ZeroCount, NegativeSpans
|
||||||
// and NegativeBuckets fields are not used.
|
// and NegativeBuckets fields are not used in that case.
|
||||||
CustomValues []float64
|
CustomValues []float64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,7 +141,8 @@ func (h *FloatHistogram) CopyTo(to *FloatHistogram) {
|
||||||
|
|
||||||
// CopyToSchema works like Copy, but the returned deep copy has the provided
|
// CopyToSchema works like Copy, but the returned deep copy has the provided
|
||||||
// target schema, which must be ≤ the original schema (i.e. it must have a lower
|
// target schema, which must be ≤ the original schema (i.e. it must have a lower
|
||||||
// resolution).
|
// resolution). This method panics if a custom buckets schema is used in the
|
||||||
|
// receiving FloatHistogram or as the provided targetSchema.
|
||||||
func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram {
|
func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram {
|
||||||
if targetSchema == h.Schema {
|
if targetSchema == h.Schema {
|
||||||
// Fast path.
|
// Fast path.
|
||||||
|
@ -253,7 +254,7 @@ func (h *FloatHistogram) TestExpression() string {
|
||||||
return "{{" + strings.Join(res, " ") + "}}"
|
return "{{" + strings.Join(res, " ") + "}}"
|
||||||
}
|
}
|
||||||
|
|
||||||
// ZeroBucket returns the zero bucket.
|
// ZeroBucket returns the zero bucket. This method panics if the schema is for custom buckets.
|
||||||
func (h *FloatHistogram) ZeroBucket() Bucket[float64] {
|
func (h *FloatHistogram) ZeroBucket() Bucket[float64] {
|
||||||
if h.UsesCustomBuckets() {
|
if h.UsesCustomBuckets() {
|
||||||
panic("histograms with custom buckets have no zero bucket")
|
panic("histograms with custom buckets have no zero bucket")
|
||||||
|
@ -922,7 +923,8 @@ func (h *FloatHistogram) reconcileZeroBuckets(other *FloatHistogram) float64 {
|
||||||
//
|
//
|
||||||
// targetSchema must be ≤ the schema of FloatHistogram (and of course within the
|
// targetSchema must be ≤ the schema of FloatHistogram (and of course within the
|
||||||
// legal values for schemas in general). The buckets are merged to match the
|
// legal values for schemas in general). The buckets are merged to match the
|
||||||
// targetSchema prior to iterating (without mutating FloatHistogram).
|
// targetSchema prior to iterating (without mutating FloatHistogram), but custom buckets
|
||||||
|
// schemas cannot be merged with other schemas.
|
||||||
func (h *FloatHistogram) floatBucketIterator(
|
func (h *FloatHistogram) floatBucketIterator(
|
||||||
positive bool, absoluteStartValue float64, targetSchema int32,
|
positive bool, absoluteStartValue float64, targetSchema int32,
|
||||||
) floatBucketIterator {
|
) floatBucketIterator {
|
||||||
|
@ -1317,6 +1319,8 @@ func FloatBucketsMatch(b1, b2 []float64) bool {
|
||||||
|
|
||||||
// ReduceResolution reduces the float histogram's spans, buckets into target schema.
|
// ReduceResolution reduces the float histogram's spans, buckets into target schema.
|
||||||
// The target schema must be smaller than the current float histogram's schema.
|
// The target schema must be smaller than the current float histogram's schema.
|
||||||
|
// This will panic if the histogram has custom buckets or if the target schema is
|
||||||
|
// a custom buckets schema.
|
||||||
func (h *FloatHistogram) ReduceResolution(targetSchema int32) *FloatHistogram {
|
func (h *FloatHistogram) ReduceResolution(targetSchema int32) *FloatHistogram {
|
||||||
if h.UsesCustomBuckets() {
|
if h.UsesCustomBuckets() {
|
||||||
panic("cannot reduce resolution when there are custom buckets")
|
panic("cannot reduce resolution when there are custom buckets")
|
||||||
|
|
|
@ -777,3 +777,10 @@ func reduceResolution[IBC InternalBucketCount](
|
||||||
|
|
||||||
return targetSpans, targetBuckets
|
return targetSpans, targetBuckets
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func clearIfNotNil[T any](items []T) []T {
|
||||||
|
if items == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return items[:0]
|
||||||
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ type Histogram struct {
|
||||||
// This slice is interned, to be treated as immutable and copied by reference.
|
// This slice is interned, to be treated as immutable and copied by reference.
|
||||||
// These numbers should be strictly increasing. This field is only used when the
|
// These numbers should be strictly increasing. This field is only used when the
|
||||||
// schema is for custom buckets, and the ZeroThreshold, ZeroCount, NegativeSpans
|
// schema is for custom buckets, and the ZeroThreshold, ZeroCount, NegativeSpans
|
||||||
// and NegativeBuckets fields are not used.
|
// and NegativeBuckets fields are not used in that case.
|
||||||
CustomValues []float64
|
CustomValues []float64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,7 +199,7 @@ func (h *Histogram) String() string {
|
||||||
return sb.String()
|
return sb.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ZeroBucket returns the zero bucket.
|
// ZeroBucket returns the zero bucket. This method panics if the schema is for custom buckets.
|
||||||
func (h *Histogram) ZeroBucket() Bucket[uint64] {
|
func (h *Histogram) ZeroBucket() Bucket[uint64] {
|
||||||
if h.UsesCustomBuckets() {
|
if h.UsesCustomBuckets() {
|
||||||
panic("histograms with custom buckets have no zero bucket")
|
panic("histograms with custom buckets have no zero bucket")
|
||||||
|
@ -417,13 +417,6 @@ func resize[T any](items []T, n int) []T {
|
||||||
return items[:n]
|
return items[:n]
|
||||||
}
|
}
|
||||||
|
|
||||||
func clearIfNotNil[T any](items []T) []T {
|
|
||||||
if items == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return items[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate validates consistency between span and bucket slices. Also, buckets are checked
|
// Validate validates consistency between span and bucket slices. Also, buckets are checked
|
||||||
// against negative values. We check to make sure there are no unexpected fields or field values
|
// against negative values. We check to make sure there are no unexpected fields or field values
|
||||||
// based on the exponential / custom buckets schema.
|
// based on the exponential / custom buckets schema.
|
||||||
|
@ -615,6 +608,8 @@ func (c *cumulativeBucketIterator) At() Bucket[uint64] {
|
||||||
|
|
||||||
// ReduceResolution reduces the histogram's spans, buckets into target schema.
|
// ReduceResolution reduces the histogram's spans, buckets into target schema.
|
||||||
// The target schema must be smaller than the current histogram's schema.
|
// The target schema must be smaller than the current histogram's schema.
|
||||||
|
// This will panic if the histogram has custom buckets or if the target schema is
|
||||||
|
// a custom buckets schema.
|
||||||
func (h *Histogram) ReduceResolution(targetSchema int32) *Histogram {
|
func (h *Histogram) ReduceResolution(targetSchema int32) *Histogram {
|
||||||
if h.UsesCustomBuckets() {
|
if h.UsesCustomBuckets() {
|
||||||
panic("cannot reduce resolution when there are custom buckets")
|
panic("cannot reduce resolution when there are custom buckets")
|
||||||
|
|
|
@ -208,14 +208,14 @@ func parseLoad(lines []string, i int) (int, *loadCmd, error) {
|
||||||
}
|
}
|
||||||
parts := patLoad.FindStringSubmatch(lines[i])
|
parts := patLoad.FindStringSubmatch(lines[i])
|
||||||
var (
|
var (
|
||||||
withNhcb = parts[1] == "with_nhcb"
|
withNHCB = parts[1] == "with_nhcb"
|
||||||
step = parts[2]
|
step = parts[2]
|
||||||
)
|
)
|
||||||
gap, err := model.ParseDuration(step)
|
gap, err := model.ParseDuration(step)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return i, nil, raise(i, "invalid step definition %q: %s", step, err)
|
return i, nil, raise(i, "invalid step definition %q: %s", step, err)
|
||||||
}
|
}
|
||||||
cmd := newLoadCmd(time.Duration(gap), withNhcb)
|
cmd := newLoadCmd(time.Duration(gap), withNHCB)
|
||||||
for i+1 < len(lines) {
|
for i+1 < len(lines) {
|
||||||
i++
|
i++
|
||||||
defLine := lines[i]
|
defLine := lines[i]
|
||||||
|
@ -253,12 +253,12 @@ func (t *test) parseEval(lines []string, i int) (int, *evalCmd, error) {
|
||||||
|
|
||||||
isInstant := instantParts != nil
|
isInstant := instantParts != nil
|
||||||
|
|
||||||
var withNhcb bool
|
var withNHCB bool
|
||||||
var mod string
|
var mod string
|
||||||
var expr string
|
var expr string
|
||||||
|
|
||||||
if isInstant {
|
if isInstant {
|
||||||
withNhcb = instantParts[1] == "with_nhcb"
|
withNHCB = instantParts[1] == "with_nhcb"
|
||||||
mod = instantParts[2]
|
mod = instantParts[2]
|
||||||
expr = instantParts[4]
|
expr = instantParts[4]
|
||||||
} else {
|
} else {
|
||||||
|
@ -332,7 +332,7 @@ func (t *test) parseEval(lines []string, i int) (int, *evalCmd, error) {
|
||||||
case "warn":
|
case "warn":
|
||||||
cmd.warn = true
|
cmd.warn = true
|
||||||
}
|
}
|
||||||
cmd.withNhcb = withNhcb
|
cmd.withNHCB = withNHCB
|
||||||
|
|
||||||
for j := 1; i+1 < len(lines); j++ {
|
for j := 1; i+1 < len(lines); j++ {
|
||||||
i++
|
i++
|
||||||
|
@ -419,16 +419,16 @@ type loadCmd struct {
|
||||||
metrics map[uint64]labels.Labels
|
metrics map[uint64]labels.Labels
|
||||||
defs map[uint64][]promql.Sample
|
defs map[uint64][]promql.Sample
|
||||||
exemplars map[uint64][]exemplar.Exemplar
|
exemplars map[uint64][]exemplar.Exemplar
|
||||||
withNhcb bool
|
withNHCB bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLoadCmd(gap time.Duration, withNhcb bool) *loadCmd {
|
func newLoadCmd(gap time.Duration, withNHCB bool) *loadCmd {
|
||||||
return &loadCmd{
|
return &loadCmd{
|
||||||
gap: gap,
|
gap: gap,
|
||||||
metrics: map[uint64]labels.Labels{},
|
metrics: map[uint64]labels.Labels{},
|
||||||
defs: map[uint64][]promql.Sample{},
|
defs: map[uint64][]promql.Sample{},
|
||||||
exemplars: map[uint64][]exemplar.Exemplar{},
|
exemplars: map[uint64][]exemplar.Exemplar{},
|
||||||
withNhcb: withNhcb,
|
withNHCB: withNHCB,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -467,7 +467,7 @@ func (cmd *loadCmd) append(a storage.Appender) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cmd.withNhcb {
|
if cmd.withNHCB {
|
||||||
return cmd.appendCustomHistogram(a)
|
return cmd.appendCustomHistogram(a)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -535,11 +535,11 @@ func processClassicHistogramSeries(m labels.Labels, suffix string, histMap map[u
|
||||||
func processUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) {
|
func processUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) {
|
||||||
sort.Float64s(upperBounds0)
|
sort.Float64s(upperBounds0)
|
||||||
upperBounds := make([]float64, 0, len(upperBounds0))
|
upperBounds := make([]float64, 0, len(upperBounds0))
|
||||||
prevLe := math.Inf(-1)
|
prevLE := math.Inf(-1)
|
||||||
for _, le := range upperBounds0 {
|
for _, le := range upperBounds0 {
|
||||||
if le != prevLe { // deduplicate
|
if le != prevLE { // deduplicate
|
||||||
upperBounds = append(upperBounds, le)
|
upperBounds = append(upperBounds, le)
|
||||||
prevLe = le
|
prevLE = le
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var customBounds []float64
|
var customBounds []float64
|
||||||
|
@ -655,7 +655,7 @@ type evalCmd struct {
|
||||||
|
|
||||||
isRange bool // if false, instant query
|
isRange bool // if false, instant query
|
||||||
fail, warn, ordered bool
|
fail, warn, ordered bool
|
||||||
withNhcb bool
|
withNHCB bool
|
||||||
|
|
||||||
metrics map[uint64]labels.Labels
|
metrics map[uint64]labels.Labels
|
||||||
expected map[uint64]entry
|
expected map[uint64]entry
|
||||||
|
@ -1028,7 +1028,7 @@ func (t *test) execInstantEval(cmd *evalCmd, engine promql.QueryEngine) error {
|
||||||
if err := t.runInstantQuery(iq, cmd, engine); err != nil {
|
if err := t.runInstantQuery(iq, cmd, engine); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if cmd.withNhcb {
|
if cmd.withNHCB {
|
||||||
if !strings.Contains(iq.expr, "_bucket") {
|
if !strings.Contains(iq.expr, "_bucket") {
|
||||||
return fmt.Errorf("expected '_bucket' in the expression %q", iq.expr)
|
return fmt.Errorf("expected '_bucket' in the expression %q", iq.expr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1400,7 +1400,8 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
|
||||||
|
|
||||||
expHist := hists[0].ToFloat(nil)
|
expHist := hists[0].ToFloat(nil)
|
||||||
for _, h := range hists[1:] {
|
for _, h := range hists[1:] {
|
||||||
expHist, _ = expHist.Add(h.ToFloat(nil))
|
expHist, err = expHist.Add(h.ToFloat(nil))
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
it := s.Iterator(nil)
|
it := s.Iterator(nil)
|
||||||
|
|
|
@ -365,16 +365,22 @@ type bucketLimitAppender struct {
|
||||||
|
|
||||||
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||||
if h != nil {
|
if h != nil {
|
||||||
|
if len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit && h.Schema > histogram.ExponentialSchemaMax {
|
||||||
|
return 0, errBucketLimit
|
||||||
|
}
|
||||||
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
|
for len(h.PositiveBuckets)+len(h.NegativeBuckets) > app.limit {
|
||||||
if h.Schema <= histogram.ExponentialSchemaMin || h.Schema > histogram.ExponentialSchemaMax {
|
if h.Schema <= histogram.ExponentialSchemaMin {
|
||||||
return 0, errBucketLimit
|
return 0, errBucketLimit
|
||||||
}
|
}
|
||||||
h = h.ReduceResolution(h.Schema - 1)
|
h = h.ReduceResolution(h.Schema - 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if fh != nil {
|
if fh != nil {
|
||||||
|
if len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit && fh.Schema > histogram.ExponentialSchemaMax {
|
||||||
|
return 0, errBucketLimit
|
||||||
|
}
|
||||||
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
|
for len(fh.PositiveBuckets)+len(fh.NegativeBuckets) > app.limit {
|
||||||
if fh.Schema <= histogram.ExponentialSchemaMin || fh.Schema > histogram.ExponentialSchemaMax {
|
if fh.Schema <= histogram.ExponentialSchemaMin {
|
||||||
return 0, errBucketLimit
|
return 0, errBucketLimit
|
||||||
}
|
}
|
||||||
fh = fh.ReduceResolution(fh.Schema - 1)
|
fh = fh.ReduceResolution(fh.Schema - 1)
|
||||||
|
|
|
@ -199,7 +199,7 @@ func isWholeWhenMultiplied(in float64) bool {
|
||||||
// we can convert the value to a float64 by subtracting 1 and dividing by 1000.
|
// we can convert the value to a float64 by subtracting 1 and dividing by 1000.
|
||||||
func putCustomBound(b *bstream, f float64) {
|
func putCustomBound(b *bstream, f float64) {
|
||||||
tf := f * 1000
|
tf := f * 1000
|
||||||
// 33554431-1 comes from the maximum that can be stored in a varint in 4
|
// 33554431-1 comes from the maximum that can be stored in a varbit in 4
|
||||||
// bytes, other values are stored in 8 bytes anyway.
|
// bytes, other values are stored in 8 bytes anyway.
|
||||||
if tf < 0 || tf > 33554430 || !isWholeWhenMultiplied(f) {
|
if tf < 0 || tf > 33554430 || !isWholeWhenMultiplied(f) {
|
||||||
b.writeBit(zero)
|
b.writeBit(zero)
|
||||||
|
|
Loading…
Reference in a new issue