diff --git a/promql/bench_test.go b/promql/bench_test.go index 9a85290915..fb3b6ac74b 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -323,6 +323,14 @@ func BenchmarkNativeHistograms(b *testing.B) { name: "sum rate with long rate interval", query: "sum(rate(native_histogram_series[20m]))", }, + { + name: "histogram_count with short rate interval", + query: "histogram_count(sum(rate(native_histogram_series[2m])))", + }, + { + name: "histogram_count with long rate interval", + query: "histogram_count(sum(rate(native_histogram_series[20m])))", + }, } opts := promql.EngineOpts{ diff --git a/promql/engine.go b/promql/engine.go index f9d6f16fc7..83e44e61f9 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -985,6 +985,11 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations return nil, nil } series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet) + if e.SkipHistogramBuckets { + for i := range series { + series[i] = newHistogramStatsSeries(series[i]) + } + } e.Series = series return ws, err } @@ -3184,6 +3189,8 @@ func unwrapStepInvariantExpr(e parser.Expr) parser.Expr { // PreprocessExpr wraps all possible step invariant parts of the given expression with // StepInvariantExpr. It also resolves the preprocessors. func PreprocessExpr(expr parser.Expr, start, end time.Time) parser.Expr { + detectHistogramStatsDecoding(expr) + isStepInvariant := preprocessExprHelper(expr, start, end) if isStepInvariant { return newStepInvariantExpr(expr) @@ -3318,8 +3325,50 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { }) } +// detectHistogramStatsDecoding modifies the expression by setting the +// SkipHistogramBuckets field in those vector selectors for which it is safe to +// return only histogram statistics (sum and count), excluding histogram spans +// and buckets. The function can be treated as an optimization and is not +// required for correctness. +func detectHistogramStatsDecoding(expr parser.Expr) { + parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { + n, ok := (node).(*parser.VectorSelector) + if !ok { + return nil + } + + for _, p := range path { + call, ok := p.(*parser.Call) + if !ok { + continue + } + if call.Func.Name == "histogram_count" || call.Func.Name == "histogram_sum" { + n.SkipHistogramBuckets = true + break + } + if call.Func.Name == "histogram_quantile" || call.Func.Name == "histogram_fraction" { + n.SkipHistogramBuckets = false + break + } + } + return fmt.Errorf("stop") + }) +} + func makeInt64Pointer(val int64) *int64 { valp := new(int64) *valp = val return valp } + +type histogramStatsSeries struct { + storage.Series +} + +func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries { + return &histogramStatsSeries{Series: series} +} + +func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + return NewHistogramStatsIterator(s.Series.Iterator(it)) +} diff --git a/promql/histogram_stats_iterator.go b/promql/histogram_stats_iterator.go new file mode 100644 index 0000000000..dfafea5f8c --- /dev/null +++ b/promql/histogram_stats_iterator.go @@ -0,0 +1,144 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promql + +import ( + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +type histogramStatsIterator struct { + chunkenc.Iterator + + currentH *histogram.Histogram + lastH *histogram.Histogram + + currentFH *histogram.FloatHistogram + lastFH *histogram.FloatHistogram +} + +// NewHistogramStatsIterator creates an iterator which returns histogram objects +// which have only their sum and count values populated. The iterator handles +// counter reset detection internally and sets the counter reset hint accordingly +// in each returned histogram objects. +func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator { + return &histogramStatsIterator{ + Iterator: it, + currentH: &histogram.Histogram{}, + currentFH: &histogram.FloatHistogram{}, + } +} + +// AtHistogram returns the next timestamp/histogram pair. The counter reset +// detection is guaranteed to be correct only when the caller does not switch +// between AtHistogram and AtFloatHistogram calls. +func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + var t int64 + t, f.currentH = f.Iterator.AtHistogram(f.currentH) + if value.IsStaleNaN(f.currentH.Sum) { + f.setLastH(f.currentH) + h = &histogram.Histogram{Sum: f.currentH.Sum} + return t, h + } + + if h == nil { + h = &histogram.Histogram{ + CounterResetHint: f.getResetHint(f.currentH), + Count: f.currentH.Count, + Sum: f.currentH.Sum, + } + f.setLastH(f.currentH) + return t, h + } + + h.CounterResetHint = f.getResetHint(f.currentH) + h.Count = f.currentH.Count + h.Sum = f.currentH.Sum + f.setLastH(f.currentH) + return t, h +} + +// AtFloatHistogram returns the next timestamp/float histogram pair. The counter +// reset detection is guaranteed to be correct only when the caller does not +// switch between AtHistogram and AtFloatHistogram calls. +func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + var t int64 + t, f.currentFH = f.Iterator.AtFloatHistogram(f.currentFH) + if value.IsStaleNaN(f.currentFH.Sum) { + f.setLastFH(f.currentFH) + return t, &histogram.FloatHistogram{Sum: f.currentFH.Sum} + } + + if fh == nil { + fh = &histogram.FloatHistogram{ + CounterResetHint: f.getFloatResetHint(f.currentFH.CounterResetHint), + Count: f.currentFH.Count, + Sum: f.currentFH.Sum, + } + f.setLastFH(f.currentFH) + return t, fh + } + + fh.CounterResetHint = f.getFloatResetHint(f.currentFH.CounterResetHint) + fh.Count = f.currentFH.Count + fh.Sum = f.currentFH.Sum + f.setLastFH(f.currentFH) + return t, fh +} + +func (f *histogramStatsIterator) setLastH(h *histogram.Histogram) { + if f.lastH == nil { + f.lastH = h.Copy() + } else { + h.CopyTo(f.lastH) + } +} + +func (f *histogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) { + if f.lastFH == nil { + f.lastFH = fh.Copy() + } else { + fh.CopyTo(f.lastFH) + } +} + +func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { + if hint != histogram.UnknownCounterReset { + return hint + } + if f.lastFH == nil { + return histogram.NotCounterReset + } + + if f.currentFH.DetectReset(f.lastFH) { + return histogram.CounterReset + } + return histogram.NotCounterReset +} + +func (f *histogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint { + if h.CounterResetHint != histogram.UnknownCounterReset { + return h.CounterResetHint + } + if f.lastH == nil { + return histogram.NotCounterReset + } + + fh, prevFH := h.ToFloat(nil), f.lastH.ToFloat(nil) + if fh.DetectReset(prevFH) { + return histogram.CounterReset + } + return histogram.NotCounterReset +} diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go new file mode 100644 index 0000000000..b71a9d6029 --- /dev/null +++ b/promql/histogram_stats_iterator_test.go @@ -0,0 +1,121 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promql + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" +) + +func TestHistogramStatsDecoding(t *testing.T) { + histograms := []*histogram.Histogram{ + tsdbutil.GenerateTestHistogram(0), + tsdbutil.GenerateTestHistogram(1), + tsdbutil.GenerateTestHistogram(2), + tsdbutil.GenerateTestHistogram(2), + } + histograms[0].CounterResetHint = histogram.NotCounterReset + histograms[1].CounterResetHint = histogram.UnknownCounterReset + histograms[2].CounterResetHint = histogram.CounterReset + histograms[3].CounterResetHint = histogram.UnknownCounterReset + + expectedHints := []histogram.CounterResetHint{ + histogram.NotCounterReset, + histogram.NotCounterReset, + histogram.CounterReset, + histogram.NotCounterReset, + } + + t.Run("histogram_stats", func(t *testing.T) { + decodedStats := make([]*histogram.Histogram, 0) + statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil)) + for statsIterator.Next() != chunkenc.ValNone { + _, h := statsIterator.AtHistogram(nil) + decodedStats = append(decodedStats, h) + } + for i := 0; i < len(histograms); i++ { + require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint) + require.Equal(t, histograms[i].Count, decodedStats[i].Count) + require.Equal(t, histograms[i].Sum, decodedStats[i].Sum) + } + }) + t.Run("float_histogram_stats", func(t *testing.T) { + decodedStats := make([]*histogram.FloatHistogram, 0) + statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil)) + for statsIterator.Next() != chunkenc.ValNone { + _, h := statsIterator.AtFloatHistogram(nil) + decodedStats = append(decodedStats, h) + } + for i := 0; i < len(histograms); i++ { + fh := histograms[i].ToFloat(nil) + require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint) + require.Equal(t, fh.Count, decodedStats[i].Count) + require.Equal(t, fh.Sum, decodedStats[i].Sum) + } + }) +} + +type histogramSeries struct { + histograms []*histogram.Histogram +} + +func newHistogramSeries(histograms []*histogram.Histogram) *histogramSeries { + return &histogramSeries{ + histograms: histograms, + } +} + +func (m histogramSeries) Labels() labels.Labels { return labels.EmptyLabels() } + +func (m histogramSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { + return &histogramIterator{ + i: -1, + histograms: m.histograms, + } +} + +type histogramIterator struct { + i int + histograms []*histogram.Histogram +} + +func (h *histogramIterator) Next() chunkenc.ValueType { + h.i++ + if h.i < len(h.histograms) { + return chunkenc.ValHistogram + } + return chunkenc.ValNone +} + +func (h *histogramIterator) Seek(t int64) chunkenc.ValueType { panic("not implemented") } + +func (h *histogramIterator) At() (int64, float64) { panic("not implemented") } + +func (h *histogramIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) { + return 0, h.histograms[h.i] +} + +func (h *histogramIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return 0, h.histograms[h.i].ToFloat(nil) +} + +func (h *histogramIterator) AtT() int64 { return 0 } + +func (h *histogramIterator) Err() error { return nil } diff --git a/promql/parser/ast.go b/promql/parser/ast.go index 379352599d..830e8a2c5e 100644 --- a/promql/parser/ast.go +++ b/promql/parser/ast.go @@ -198,10 +198,11 @@ type VectorSelector struct { // Offset is the offset used during the query execution // which is calculated using the original offset, at modifier time, // eval time, and subquery offsets in the AST tree. - Offset time.Duration - Timestamp *int64 - StartOrEnd ItemType // Set when @ is used with start() or end() - LabelMatchers []*labels.Matcher + Offset time.Duration + Timestamp *int64 + SkipHistogramBuckets bool // Set when decoding native histogram buckets is not needed for query evaluation. + StartOrEnd ItemType // Set when @ is used with start() or end() + LabelMatchers []*labels.Matcher // The unexpanded seriesSet populated at query preparation time. UnexpandedSeriesSet storage.SeriesSet