mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
Implement histogram statistics decoder (#14097)
Implement histogram statistics decoder This commit speeds up histogram_count and histogram_sum functions on native histograms. The idea is to have separate decoders which can be used by the engine to only read count/sum values from histogram objects. This should help with reducing allocations when decoding histograms, as well as with speeding up aggregations like sum since they will be done on floats and not on histogram objects. Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> --------- Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
This commit is contained in:
parent
ee0527e0c6
commit
6e68046c25
|
@ -323,6 +323,14 @@ func BenchmarkNativeHistograms(b *testing.B) {
|
|||
name: "sum rate with long rate interval",
|
||||
query: "sum(rate(native_histogram_series[20m]))",
|
||||
},
|
||||
{
|
||||
name: "histogram_count with short rate interval",
|
||||
query: "histogram_count(sum(rate(native_histogram_series[2m])))",
|
||||
},
|
||||
{
|
||||
name: "histogram_count with long rate interval",
|
||||
query: "histogram_count(sum(rate(native_histogram_series[20m])))",
|
||||
},
|
||||
}
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
|
|
|
@ -985,6 +985,11 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations
|
|||
return nil, nil
|
||||
}
|
||||
series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
|
||||
if e.SkipHistogramBuckets {
|
||||
for i := range series {
|
||||
series[i] = newHistogramStatsSeries(series[i])
|
||||
}
|
||||
}
|
||||
e.Series = series
|
||||
return ws, err
|
||||
}
|
||||
|
@ -3184,6 +3189,8 @@ func unwrapStepInvariantExpr(e parser.Expr) parser.Expr {
|
|||
// PreprocessExpr wraps all possible step invariant parts of the given expression with
|
||||
// StepInvariantExpr. It also resolves the preprocessors.
|
||||
func PreprocessExpr(expr parser.Expr, start, end time.Time) parser.Expr {
|
||||
detectHistogramStatsDecoding(expr)
|
||||
|
||||
isStepInvariant := preprocessExprHelper(expr, start, end)
|
||||
if isStepInvariant {
|
||||
return newStepInvariantExpr(expr)
|
||||
|
@ -3318,8 +3325,50 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) {
|
|||
})
|
||||
}
|
||||
|
||||
// detectHistogramStatsDecoding modifies the expression by setting the
|
||||
// SkipHistogramBuckets field in those vector selectors for which it is safe to
|
||||
// return only histogram statistics (sum and count), excluding histogram spans
|
||||
// and buckets. The function can be treated as an optimization and is not
|
||||
// required for correctness.
|
||||
func detectHistogramStatsDecoding(expr parser.Expr) {
|
||||
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
|
||||
n, ok := (node).(*parser.VectorSelector)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, p := range path {
|
||||
call, ok := p.(*parser.Call)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if call.Func.Name == "histogram_count" || call.Func.Name == "histogram_sum" {
|
||||
n.SkipHistogramBuckets = true
|
||||
break
|
||||
}
|
||||
if call.Func.Name == "histogram_quantile" || call.Func.Name == "histogram_fraction" {
|
||||
n.SkipHistogramBuckets = false
|
||||
break
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("stop")
|
||||
})
|
||||
}
|
||||
|
||||
func makeInt64Pointer(val int64) *int64 {
|
||||
valp := new(int64)
|
||||
*valp = val
|
||||
return valp
|
||||
}
|
||||
|
||||
type histogramStatsSeries struct {
|
||||
storage.Series
|
||||
}
|
||||
|
||||
func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries {
|
||||
return &histogramStatsSeries{Series: series}
|
||||
}
|
||||
|
||||
func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||
return NewHistogramStatsIterator(s.Series.Iterator(it))
|
||||
}
|
||||
|
|
144
promql/histogram_stats_iterator.go
Normal file
144
promql/histogram_stats_iterator.go
Normal file
|
@ -0,0 +1,144 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
)
|
||||
|
||||
type histogramStatsIterator struct {
|
||||
chunkenc.Iterator
|
||||
|
||||
currentH *histogram.Histogram
|
||||
lastH *histogram.Histogram
|
||||
|
||||
currentFH *histogram.FloatHistogram
|
||||
lastFH *histogram.FloatHistogram
|
||||
}
|
||||
|
||||
// NewHistogramStatsIterator creates an iterator which returns histogram objects
|
||||
// which have only their sum and count values populated. The iterator handles
|
||||
// counter reset detection internally and sets the counter reset hint accordingly
|
||||
// in each returned histogram objects.
|
||||
func NewHistogramStatsIterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||
return &histogramStatsIterator{
|
||||
Iterator: it,
|
||||
currentH: &histogram.Histogram{},
|
||||
currentFH: &histogram.FloatHistogram{},
|
||||
}
|
||||
}
|
||||
|
||||
// AtHistogram returns the next timestamp/histogram pair. The counter reset
|
||||
// detection is guaranteed to be correct only when the caller does not switch
|
||||
// between AtHistogram and AtFloatHistogram calls.
|
||||
func (f *histogramStatsIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
var t int64
|
||||
t, f.currentH = f.Iterator.AtHistogram(f.currentH)
|
||||
if value.IsStaleNaN(f.currentH.Sum) {
|
||||
f.setLastH(f.currentH)
|
||||
h = &histogram.Histogram{Sum: f.currentH.Sum}
|
||||
return t, h
|
||||
}
|
||||
|
||||
if h == nil {
|
||||
h = &histogram.Histogram{
|
||||
CounterResetHint: f.getResetHint(f.currentH),
|
||||
Count: f.currentH.Count,
|
||||
Sum: f.currentH.Sum,
|
||||
}
|
||||
f.setLastH(f.currentH)
|
||||
return t, h
|
||||
}
|
||||
|
||||
h.CounterResetHint = f.getResetHint(f.currentH)
|
||||
h.Count = f.currentH.Count
|
||||
h.Sum = f.currentH.Sum
|
||||
f.setLastH(f.currentH)
|
||||
return t, h
|
||||
}
|
||||
|
||||
// AtFloatHistogram returns the next timestamp/float histogram pair. The counter
|
||||
// reset detection is guaranteed to be correct only when the caller does not
|
||||
// switch between AtHistogram and AtFloatHistogram calls.
|
||||
func (f *histogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
var t int64
|
||||
t, f.currentFH = f.Iterator.AtFloatHistogram(f.currentFH)
|
||||
if value.IsStaleNaN(f.currentFH.Sum) {
|
||||
f.setLastFH(f.currentFH)
|
||||
return t, &histogram.FloatHistogram{Sum: f.currentFH.Sum}
|
||||
}
|
||||
|
||||
if fh == nil {
|
||||
fh = &histogram.FloatHistogram{
|
||||
CounterResetHint: f.getFloatResetHint(f.currentFH.CounterResetHint),
|
||||
Count: f.currentFH.Count,
|
||||
Sum: f.currentFH.Sum,
|
||||
}
|
||||
f.setLastFH(f.currentFH)
|
||||
return t, fh
|
||||
}
|
||||
|
||||
fh.CounterResetHint = f.getFloatResetHint(f.currentFH.CounterResetHint)
|
||||
fh.Count = f.currentFH.Count
|
||||
fh.Sum = f.currentFH.Sum
|
||||
f.setLastFH(f.currentFH)
|
||||
return t, fh
|
||||
}
|
||||
|
||||
func (f *histogramStatsIterator) setLastH(h *histogram.Histogram) {
|
||||
if f.lastH == nil {
|
||||
f.lastH = h.Copy()
|
||||
} else {
|
||||
h.CopyTo(f.lastH)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *histogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) {
|
||||
if f.lastFH == nil {
|
||||
f.lastFH = fh.Copy()
|
||||
} else {
|
||||
fh.CopyTo(f.lastFH)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *histogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint {
|
||||
if hint != histogram.UnknownCounterReset {
|
||||
return hint
|
||||
}
|
||||
if f.lastFH == nil {
|
||||
return histogram.NotCounterReset
|
||||
}
|
||||
|
||||
if f.currentFH.DetectReset(f.lastFH) {
|
||||
return histogram.CounterReset
|
||||
}
|
||||
return histogram.NotCounterReset
|
||||
}
|
||||
|
||||
func (f *histogramStatsIterator) getResetHint(h *histogram.Histogram) histogram.CounterResetHint {
|
||||
if h.CounterResetHint != histogram.UnknownCounterReset {
|
||||
return h.CounterResetHint
|
||||
}
|
||||
if f.lastH == nil {
|
||||
return histogram.NotCounterReset
|
||||
}
|
||||
|
||||
fh, prevFH := h.ToFloat(nil), f.lastH.ToFloat(nil)
|
||||
if fh.DetectReset(prevFH) {
|
||||
return histogram.CounterReset
|
||||
}
|
||||
return histogram.NotCounterReset
|
||||
}
|
121
promql/histogram_stats_iterator_test.go
Normal file
121
promql/histogram_stats_iterator_test.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package promql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
)
|
||||
|
||||
func TestHistogramStatsDecoding(t *testing.T) {
|
||||
histograms := []*histogram.Histogram{
|
||||
tsdbutil.GenerateTestHistogram(0),
|
||||
tsdbutil.GenerateTestHistogram(1),
|
||||
tsdbutil.GenerateTestHistogram(2),
|
||||
tsdbutil.GenerateTestHistogram(2),
|
||||
}
|
||||
histograms[0].CounterResetHint = histogram.NotCounterReset
|
||||
histograms[1].CounterResetHint = histogram.UnknownCounterReset
|
||||
histograms[2].CounterResetHint = histogram.CounterReset
|
||||
histograms[3].CounterResetHint = histogram.UnknownCounterReset
|
||||
|
||||
expectedHints := []histogram.CounterResetHint{
|
||||
histogram.NotCounterReset,
|
||||
histogram.NotCounterReset,
|
||||
histogram.CounterReset,
|
||||
histogram.NotCounterReset,
|
||||
}
|
||||
|
||||
t.Run("histogram_stats", func(t *testing.T) {
|
||||
decodedStats := make([]*histogram.Histogram, 0)
|
||||
statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil))
|
||||
for statsIterator.Next() != chunkenc.ValNone {
|
||||
_, h := statsIterator.AtHistogram(nil)
|
||||
decodedStats = append(decodedStats, h)
|
||||
}
|
||||
for i := 0; i < len(histograms); i++ {
|
||||
require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint)
|
||||
require.Equal(t, histograms[i].Count, decodedStats[i].Count)
|
||||
require.Equal(t, histograms[i].Sum, decodedStats[i].Sum)
|
||||
}
|
||||
})
|
||||
t.Run("float_histogram_stats", func(t *testing.T) {
|
||||
decodedStats := make([]*histogram.FloatHistogram, 0)
|
||||
statsIterator := NewHistogramStatsIterator(newHistogramSeries(histograms).Iterator(nil))
|
||||
for statsIterator.Next() != chunkenc.ValNone {
|
||||
_, h := statsIterator.AtFloatHistogram(nil)
|
||||
decodedStats = append(decodedStats, h)
|
||||
}
|
||||
for i := 0; i < len(histograms); i++ {
|
||||
fh := histograms[i].ToFloat(nil)
|
||||
require.Equal(t, expectedHints[i], decodedStats[i].CounterResetHint)
|
||||
require.Equal(t, fh.Count, decodedStats[i].Count)
|
||||
require.Equal(t, fh.Sum, decodedStats[i].Sum)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type histogramSeries struct {
|
||||
histograms []*histogram.Histogram
|
||||
}
|
||||
|
||||
func newHistogramSeries(histograms []*histogram.Histogram) *histogramSeries {
|
||||
return &histogramSeries{
|
||||
histograms: histograms,
|
||||
}
|
||||
}
|
||||
|
||||
func (m histogramSeries) Labels() labels.Labels { return labels.EmptyLabels() }
|
||||
|
||||
func (m histogramSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
|
||||
return &histogramIterator{
|
||||
i: -1,
|
||||
histograms: m.histograms,
|
||||
}
|
||||
}
|
||||
|
||||
type histogramIterator struct {
|
||||
i int
|
||||
histograms []*histogram.Histogram
|
||||
}
|
||||
|
||||
func (h *histogramIterator) Next() chunkenc.ValueType {
|
||||
h.i++
|
||||
if h.i < len(h.histograms) {
|
||||
return chunkenc.ValHistogram
|
||||
}
|
||||
return chunkenc.ValNone
|
||||
}
|
||||
|
||||
func (h *histogramIterator) Seek(t int64) chunkenc.ValueType { panic("not implemented") }
|
||||
|
||||
func (h *histogramIterator) At() (int64, float64) { panic("not implemented") }
|
||||
|
||||
func (h *histogramIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
return 0, h.histograms[h.i]
|
||||
}
|
||||
|
||||
func (h *histogramIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
return 0, h.histograms[h.i].ToFloat(nil)
|
||||
}
|
||||
|
||||
func (h *histogramIterator) AtT() int64 { return 0 }
|
||||
|
||||
func (h *histogramIterator) Err() error { return nil }
|
|
@ -198,10 +198,11 @@ type VectorSelector struct {
|
|||
// Offset is the offset used during the query execution
|
||||
// which is calculated using the original offset, at modifier time,
|
||||
// eval time, and subquery offsets in the AST tree.
|
||||
Offset time.Duration
|
||||
Timestamp *int64
|
||||
StartOrEnd ItemType // Set when @ is used with start() or end()
|
||||
LabelMatchers []*labels.Matcher
|
||||
Offset time.Duration
|
||||
Timestamp *int64
|
||||
SkipHistogramBuckets bool // Set when decoding native histogram buckets is not needed for query evaluation.
|
||||
StartOrEnd ItemType // Set when @ is used with start() or end()
|
||||
LabelMatchers []*labels.Matcher
|
||||
|
||||
// The unexpanded seriesSet populated at query preparation time.
|
||||
UnexpandedSeriesSet storage.SeriesSet
|
||||
|
|
Loading…
Reference in a new issue