Optimize histogram iterators (#13340)

Optimize histogram iterators

Histogram iterators allocate new objects in the AtHistogram and
AtFloatHistogram methods, which makes calculating rates over long
ranges expensive.

In #13215 we allowed an existing object to be reused
when converting an integer histogram to a float histogram. This commit follows
the same idea and allows injecting an existing object in the AtHistogram and
AtFloatHistogram methods. When the injected value is nil, iterators allocate
new histograms, otherwise they populate and return the injected object.

The commit also adds a CopyTo method to Histogram and FloatHistogram which
is used in the BufferedIterator to overwrite items in the ring instead of making
new copies.

Note that a specialized HPoint pool is needed for all of this to work 
(`matrixSelectorHPool`).

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
This commit is contained in:
Filip Petkovski 2024-01-23 17:02:14 +01:00 committed by GitHub
parent 78411d5e8b
commit 583f3e587c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 621 additions and 186 deletions

View file

@ -667,7 +667,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
it := fhchk.Iterator(nil)
bucketCount := 0
for it.Next() == chunkenc.ValFloatHistogram {
_, f := it.AtFloatHistogram()
_, f := it.AtFloatHistogram(nil)
bucketCount += len(f.PositiveBuckets)
bucketCount += len(f.NegativeBuckets)
}
@ -682,7 +682,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
it := hchk.Iterator(nil)
bucketCount := 0
for it.Next() == chunkenc.ValHistogram {
_, f := it.AtHistogram()
_, f := it.AtHistogram(nil)
bucketCount += len(f.PositiveBuckets)
bucketCount += len(f.NegativeBuckets)
}
@ -745,11 +745,11 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []str
fmt.Printf("%s %g %d\n", lbs, val, ts)
}
for it.Next() == chunkenc.ValFloatHistogram {
ts, fh := it.AtFloatHistogram()
ts, fh := it.AtFloatHistogram(nil)
fmt.Printf("%s %s %d\n", lbs, fh.String(), ts)
}
for it.Next() == chunkenc.ValHistogram {
ts, h := it.AtHistogram()
ts, h := it.AtHistogram(nil)
fmt.Printf("%s %s %d\n", lbs, h.String(), ts)
}
if it.Err() != nil {

View file

@ -53,21 +53,28 @@ type FloatHistogram struct {
// Copy returns a deep copy of the Histogram.
func (h *FloatHistogram) Copy() *FloatHistogram {
c := *h
c := FloatHistogram{
CounterResetHint: h.CounterResetHint,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.ZeroCount,
Count: h.Count,
Sum: h.Sum,
}
if h.PositiveSpans != nil {
if len(h.PositiveSpans) != 0 {
c.PositiveSpans = make([]Span, len(h.PositiveSpans))
copy(c.PositiveSpans, h.PositiveSpans)
}
if h.NegativeSpans != nil {
if len(h.NegativeSpans) != 0 {
c.NegativeSpans = make([]Span, len(h.NegativeSpans))
copy(c.NegativeSpans, h.NegativeSpans)
}
if h.PositiveBuckets != nil {
if len(h.PositiveBuckets) != 0 {
c.PositiveBuckets = make([]float64, len(h.PositiveBuckets))
copy(c.PositiveBuckets, h.PositiveBuckets)
}
if h.NegativeBuckets != nil {
if len(h.NegativeBuckets) != 0 {
c.NegativeBuckets = make([]float64, len(h.NegativeBuckets))
copy(c.NegativeBuckets, h.NegativeBuckets)
}
@ -75,6 +82,29 @@ func (h *FloatHistogram) Copy() *FloatHistogram {
return &c
}
// CopyTo makes a deep copy into the given FloatHistogram.
// The destination object has to be a non-nil pointer.
func (h *FloatHistogram) CopyTo(to *FloatHistogram) {
to.CounterResetHint = h.CounterResetHint
to.Schema = h.Schema
to.ZeroThreshold = h.ZeroThreshold
to.ZeroCount = h.ZeroCount
to.Count = h.Count
to.Sum = h.Sum
to.PositiveSpans = resize(to.PositiveSpans, len(h.PositiveSpans))
copy(to.PositiveSpans, h.PositiveSpans)
to.NegativeSpans = resize(to.NegativeSpans, len(h.NegativeSpans))
copy(to.NegativeSpans, h.NegativeSpans)
to.PositiveBuckets = resize(to.PositiveBuckets, len(h.PositiveBuckets))
copy(to.PositiveBuckets, h.PositiveBuckets)
to.NegativeBuckets = resize(to.NegativeBuckets, len(h.NegativeBuckets))
copy(to.NegativeBuckets, h.NegativeBuckets)
}
// CopyToSchema works like Copy, but the returned deep copy has the provided
// target schema, which must be ≤ the original schema (i.e. it must have a lower
// resolution).

View file

@ -142,6 +142,118 @@ func TestFloatHistogramMul(t *testing.T) {
}
}
func TestFloatHistogramCopy(t *testing.T) {
cases := []struct {
name string
orig *FloatHistogram
expected *FloatHistogram
}{
{
name: "without buckets",
orig: &FloatHistogram{},
expected: &FloatHistogram{},
},
{
name: "with buckets",
orig: &FloatHistogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []float64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []float64{5, 3, 1.234e5, 1000},
},
expected: &FloatHistogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []float64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []float64{5, 3, 1.234e5, 1000},
},
},
{
name: "with empty buckets and non empty capacity",
orig: &FloatHistogram{
PositiveSpans: make([]Span, 0, 1),
PositiveBuckets: make([]float64, 0, 1),
NegativeSpans: make([]Span, 0, 1),
NegativeBuckets: make([]float64, 0, 1),
},
expected: &FloatHistogram{},
},
}
for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
hCopy := tcase.orig.Copy()
// Modify a primitive value in the original histogram.
tcase.orig.Sum++
require.Equal(t, tcase.expected, hCopy)
assertDeepCopyFHSpans(t, tcase.orig, hCopy, tcase.expected)
})
}
}
func TestFloatHistogramCopyTo(t *testing.T) {
cases := []struct {
name string
orig *FloatHistogram
expected *FloatHistogram
}{
{
name: "without buckets",
orig: &FloatHistogram{},
expected: &FloatHistogram{},
},
{
name: "with buckets",
orig: &FloatHistogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []float64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []float64{5, 3, 1.234e5, 1000},
},
expected: &FloatHistogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []float64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []float64{5, 3, 1.234e5, 1000},
},
},
{
name: "with empty buckets and non empty capacity",
orig: &FloatHistogram{
PositiveSpans: make([]Span, 0, 1),
PositiveBuckets: make([]float64, 0, 1),
NegativeSpans: make([]Span, 0, 1),
NegativeBuckets: make([]float64, 0, 1),
},
expected: &FloatHistogram{},
},
}
for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
hCopy := &FloatHistogram{}
tcase.orig.CopyTo(hCopy)
// Modify a primitive value in the original histogram.
tcase.orig.Sum++
require.Equal(t, tcase.expected, hCopy)
assertDeepCopyFHSpans(t, tcase.orig, hCopy, tcase.expected)
})
}
}
func assertDeepCopyFHSpans(t *testing.T, orig, hCopy, expected *FloatHistogram) {
// Do an in-place expansion of an original spans slice.
orig.PositiveSpans = expandSpans(orig.PositiveSpans)
orig.PositiveSpans[len(orig.PositiveSpans)-1] = Span{1, 2}
hCopy.PositiveSpans = expandSpans(hCopy.PositiveSpans)
expected.PositiveSpans = expandSpans(expected.PositiveSpans)
// Expand the copy spans and assert that modifying the original has not affected the copy.
require.Equal(t, expected, hCopy)
}
func TestFloatHistogramDiv(t *testing.T) {
cases := []struct {
name string

View file

@ -83,7 +83,14 @@ type Span struct {
// Copy returns a deep copy of the Histogram.
func (h *Histogram) Copy() *Histogram {
c := *h
c := Histogram{
CounterResetHint: h.CounterResetHint,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: h.ZeroCount,
Count: h.Count,
Sum: h.Sum,
}
if len(h.PositiveSpans) != 0 {
c.PositiveSpans = make([]Span, len(h.PositiveSpans))
@ -105,6 +112,29 @@ func (h *Histogram) Copy() *Histogram {
return &c
}
// CopyTo makes a deep copy into the given Histogram object.
// The destination object has to be a non-nil pointer.
func (h *Histogram) CopyTo(to *Histogram) {
to.CounterResetHint = h.CounterResetHint
to.Schema = h.Schema
to.ZeroThreshold = h.ZeroThreshold
to.ZeroCount = h.ZeroCount
to.Count = h.Count
to.Sum = h.Sum
to.PositiveSpans = resize(to.PositiveSpans, len(h.PositiveSpans))
copy(to.PositiveSpans, h.PositiveSpans)
to.NegativeSpans = resize(to.NegativeSpans, len(h.NegativeSpans))
copy(to.NegativeSpans, h.NegativeSpans)
to.PositiveBuckets = resize(to.PositiveBuckets, len(h.PositiveBuckets))
copy(to.PositiveBuckets, h.PositiveBuckets)
to.NegativeBuckets = resize(to.NegativeBuckets, len(h.NegativeBuckets))
copy(to.NegativeBuckets, h.NegativeBuckets)
}
// String returns a string representation of the Histogram.
func (h *Histogram) String() string {
var sb strings.Builder

View file

@ -604,6 +604,128 @@ func TestHistogramEquals(t *testing.T) {
notEquals(*hStale, *hNaN)
}
func TestHistogramCopy(t *testing.T) {
cases := []struct {
name string
orig *Histogram
expected *Histogram
}{
{
name: "without buckets",
orig: &Histogram{},
expected: &Histogram{},
},
{
name: "with buckets",
orig: &Histogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []int64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []int64{5, 3, 1.234e5, 1000},
},
expected: &Histogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []int64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []int64{5, 3, 1.234e5, 1000},
},
},
{
name: "with empty buckets and non empty capacity",
orig: &Histogram{
PositiveSpans: make([]Span, 0, 1),
PositiveBuckets: make([]int64, 0, 1),
NegativeSpans: make([]Span, 0, 1),
NegativeBuckets: make([]int64, 0, 1),
},
expected: &Histogram{},
},
}
for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
hCopy := tcase.orig.Copy()
// Modify a primitive value in the original histogram.
tcase.orig.Sum++
require.Equal(t, tcase.expected, hCopy)
assertDeepCopyHSpans(t, tcase.orig, hCopy, tcase.expected)
})
}
}
func TestHistogramCopyTo(t *testing.T) {
cases := []struct {
name string
orig *Histogram
expected *Histogram
}{
{
name: "without buckets",
orig: &Histogram{},
expected: &Histogram{},
},
{
name: "with buckets",
orig: &Histogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []int64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []int64{5, 3, 1.234e5, 1000},
},
expected: &Histogram{
PositiveSpans: []Span{{-2, 1}},
PositiveBuckets: []int64{1, 3, -3, 42},
NegativeSpans: []Span{{3, 2}},
NegativeBuckets: []int64{5, 3, 1.234e5, 1000},
},
},
{
name: "with empty buckets and non empty capacity",
orig: &Histogram{
PositiveSpans: make([]Span, 0, 1),
PositiveBuckets: make([]int64, 0, 1),
NegativeSpans: make([]Span, 0, 1),
NegativeBuckets: make([]int64, 0, 1),
},
expected: &Histogram{},
},
}
for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
hCopy := &Histogram{}
tcase.orig.CopyTo(hCopy)
// Modify a primitive value in the original histogram.
tcase.orig.Sum++
require.Equal(t, tcase.expected, hCopy)
assertDeepCopyHSpans(t, tcase.orig, hCopy, tcase.expected)
})
}
}
func assertDeepCopyHSpans(t *testing.T, orig, hCopy, expected *Histogram) {
// Do an in-place expansion of an original spans slice.
orig.PositiveSpans = expandSpans(orig.PositiveSpans)
orig.PositiveSpans[len(orig.PositiveSpans)-1] = Span{1, 2}
hCopy.PositiveSpans = expandSpans(hCopy.PositiveSpans)
expected.PositiveSpans = expandSpans(expected.PositiveSpans)
// Expand the copy spans and assert that modifying the original has not affected the copy.
require.Equal(t, expected, hCopy)
}
func expandSpans(spans []Span) []Span {
n := len(spans)
if cap(spans) > n {
spans = spans[:n+1]
} else {
spans = append(spans, Span{})
}
return spans
}
func TestHistogramCompact(t *testing.T) {
cases := []struct {
name string

View file

@ -66,6 +66,9 @@ const (
// The getHPointSlice and getFPointSlice functions are called with an estimated size which often can be
// over-estimated.
maxPointsSliceSize = 5000
// The default buffer size for points used by the matrix selector.
matrixSelectorSliceSize = 16
)
type engineMetrics struct {
@ -1564,7 +1567,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
ev.currentSamples -= len(floats) + totalHPointSize(histograms)
putFPointSlice(floats)
putHPointSlice(histograms)
putMatrixSelectorHPointSlice(histograms)
// The absent_over_time function returns 0 or 1 series. So far, the matrix
// contains multiple series. The following code will create a new series
@ -1940,6 +1943,13 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, no
var (
fPointPool zeropool.Pool[[]FPoint]
hPointPool zeropool.Pool[[]HPoint]
// matrixSelectorHPool holds reusable histogram slices used by the matrix
// selector. The key difference between this pool and the hPointPool is that
// slices returned by this pool should never hold multiple copies of the same
// histogram pointer since histogram objects are reused across query evaluation
// steps.
matrixSelectorHPool zeropool.Pool[[]HPoint]
)
func getFPointSlice(sz int) []FPoint {
@ -1982,6 +1992,20 @@ func putHPointSlice(p []HPoint) {
}
}
func getMatrixSelectorHPoints() []HPoint {
if p := matrixSelectorHPool.Get(); p != nil {
return p
}
return make([]HPoint, 0, matrixSelectorSliceSize)
}
func putMatrixSelectorHPointSlice(p []HPoint) {
if p != nil {
matrixSelectorHPool.Put(p[:0])
}
}
// matrixSelector evaluates a *parser.MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) {
var (
@ -2106,13 +2130,13 @@ loop:
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mintHistograms {
if histograms == nil {
histograms = getHPointSlice(16)
histograms = getMatrixSelectorHPoints()
}
n := len(histograms)
if n < cap(histograms) {
histograms = histograms[:n+1]
} else {
histograms = append(histograms, HPoint{})
histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}})
}
histograms[n].T, histograms[n].H = buf.AtFloatHistogram(histograms[n].H)
if value.IsStaleNaN(histograms[n].H.Sum) {
@ -2145,23 +2169,28 @@ loop:
// The sought sample might also be in the range.
switch soughtValueType {
case chunkenc.ValFloatHistogram, chunkenc.ValHistogram:
t := it.AtT()
if t == maxt {
_, h := it.AtFloatHistogram()
if !value.IsStaleNaN(h.Sum) {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if histograms == nil {
histograms = getHPointSlice(16)
}
// The last sample comes directly from the iterator, so we need to copy it to
// avoid having the same reference twice in the buffer.
point := HPoint{T: t, H: h.Copy()}
histograms = append(histograms, point)
ev.currentSamples += point.size()
}
if it.AtT() != maxt {
break
}
if histograms == nil {
histograms = getMatrixSelectorHPoints()
}
n := len(histograms)
if n < cap(histograms) {
histograms = histograms[:n+1]
} else {
histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}})
}
histograms[n].T, histograms[n].H = it.AtFloatHistogram(histograms[n].H)
if value.IsStaleNaN(histograms[n].H.Sum) {
histograms = histograms[:n]
break
}
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.currentSamples += histograms[n].size()
case chunkenc.ValFloat:
t, f := it.At()
if t == maxt && !value.IsStaleNaN(f) {

View file

@ -464,11 +464,11 @@ func (ssi *storageSeriesIterator) At() (t int64, v float64) {
return ssi.currT, ssi.currF
}
func (ssi *storageSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
func (ssi *storageSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
panic(errors.New("storageSeriesIterator: AtHistogram not supported"))
}
func (ssi *storageSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (ssi *storageSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return ssi.currT, ssi.currH
}

View file

@ -1397,7 +1397,7 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
it := s.Iterator(nil)
require.Equal(t, chunkenc.ValFloatHistogram, it.Next())
tsp, fh := it.AtFloatHistogram()
tsp, fh := it.AtFloatHistogram(nil)
require.Equal(t, ts.Add(10*time.Second).UnixMilli(), tsp)
require.Equal(t, expHist, fh)
require.Equal(t, chunkenc.ValNone, it.Next())

View file

@ -24,6 +24,9 @@ import (
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
type BufferedSeriesIterator struct {
hReader histogram.Histogram
fhReader histogram.FloatHistogram
it chunkenc.Iterator
buf *sampleRing
delta int64
@ -118,10 +121,10 @@ func (b *BufferedSeriesIterator) Next() chunkenc.ValueType {
t, f := b.it.At()
b.buf.addF(fSample{t: t, f: f})
case chunkenc.ValHistogram:
t, h := b.it.AtHistogram()
t, h := b.it.AtHistogram(&b.hReader)
b.buf.addH(hSample{t: t, h: h})
case chunkenc.ValFloatHistogram:
t, fh := b.it.AtFloatHistogram()
t, fh := b.it.AtFloatHistogram(&b.fhReader)
b.buf.addFH(fhSample{t: t, fh: fh})
default:
panic(fmt.Errorf("BufferedSeriesIterator: unknown value type %v", b.valueType))
@ -140,13 +143,13 @@ func (b *BufferedSeriesIterator) At() (int64, float64) {
}
// AtHistogram returns the current histogram element of the iterator.
func (b *BufferedSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
return b.it.AtHistogram()
func (b *BufferedSeriesIterator) AtHistogram(fh *histogram.Histogram) (int64, *histogram.Histogram) {
return b.it.AtHistogram(fh)
}
// AtFloatHistogram returns the current float-histogram element of the iterator.
func (b *BufferedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
return b.it.AtFloatHistogram()
func (b *BufferedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return b.it.AtFloatHistogram(fh)
}
// AtT returns the current timestamp of the iterator.
@ -378,7 +381,11 @@ func (it *SampleRingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (in
if it.fh == nil {
return it.t, it.h.ToFloat(fh)
}
return it.t, it.fh
if fh != nil {
it.fh.CopyTo(fh)
return it.t, fh
}
return it.t, it.fh.Copy()
}
func (it *SampleRingIterator) AtT() int64 {
@ -672,7 +679,12 @@ func addH(s hSample, buf []hSample, r *sampleRing) []hSample {
}
}
buf[r.i] = s
buf[r.i].t = s.t
if buf[r.i].h == nil {
buf[r.i].h = s.h.Copy()
} else {
s.h.CopyTo(buf[r.i].h)
}
r.l++
// Free head of the buffer of samples that just fell out of the range.
@ -711,7 +723,12 @@ func addFH(s fhSample, buf []fhSample, r *sampleRing) []fhSample {
}
}
buf[r.i] = s
buf[r.i].t = s.t
if buf[r.i].fh == nil {
buf[r.i].fh = s.fh.Copy()
} else {
s.fh.CopyTo(buf[r.i].fh)
}
r.l++
// Free head of the buffer of samples that just fell out of the range.

View file

@ -277,11 +277,11 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) Next() chunkenc.ValueType { return m.next() }
func (m *mockSeriesIterator) Err() error { return m.err() }
func (m *mockSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
func (m *mockSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
return 0, nil // Not really mocked.
}
func (m *mockSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (m *mockSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return 0, nil // Not really mocked.
}
@ -303,11 +303,11 @@ func (it *fakeSeriesIterator) At() (int64, float64) {
return it.idx * it.step, 123 // Value doesn't matter.
}
func (it *fakeSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
func (it *fakeSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
return it.idx * it.step, &histogram.Histogram{} // Value doesn't matter.
}
func (it *fakeSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (it *fakeSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return it.idx * it.step, &histogram.FloatHistogram{} // Value doesn't matter.
}

View file

@ -113,7 +113,7 @@ func (b *MemoizedSeriesIterator) Next() chunkenc.ValueType {
b.prevFloatHistogram = nil
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
b.prevValue = 0
b.prevTime, b.prevFloatHistogram = b.it.AtFloatHistogram()
b.prevTime, b.prevFloatHistogram = b.it.AtFloatHistogram(nil)
}
b.valueType = b.it.Next()
@ -133,7 +133,7 @@ func (b *MemoizedSeriesIterator) At() (int64, float64) {
// AtFloatHistogram returns the current float-histogram element of the iterator.
func (b *MemoizedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
return b.it.AtFloatHistogram()
return b.it.AtFloatHistogram(nil)
}
// Err returns the last encountered error.

View file

@ -525,11 +525,11 @@ func (c *chainSampleIterator) At() (t int64, v float64) {
return c.curr.At()
}
func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
func (c *chainSampleIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
if c.curr == nil {
panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.")
}
t, h := c.curr.AtHistogram()
t, h := c.curr.AtHistogram(h)
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
@ -542,11 +542,11 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
return t, h
}
func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (c *chainSampleIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
if c.curr == nil {
panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.")
}
t, fh := c.curr.AtFloatHistogram()
t, fh := c.curr.AtFloatHistogram(fh)
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a

View file

@ -1173,10 +1173,10 @@ func TestChainSampleIteratorSeek(t *testing.T) {
t, f := merged.At()
actual = append(actual, fSample{t, f})
case chunkenc.ValHistogram:
t, h := merged.AtHistogram()
t, h := merged.AtHistogram(nil)
actual = append(actual, hSample{t, h})
case chunkenc.ValFloatHistogram:
t, fh := merged.AtFloatHistogram()
t, fh := merged.AtFloatHistogram(nil)
actual = append(actual, fhSample{t, fh})
}
s, err := ExpandSamples(merged, nil)
@ -1259,10 +1259,10 @@ func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) {
t, f := merged.At()
actual = append(actual, fSample{t, f})
case chunkenc.ValHistogram:
t, h := merged.AtHistogram()
t, h := merged.AtHistogram(nil)
actual = append(actual, hSample{t, h})
case chunkenc.ValFloatHistogram:
t, fh := merged.AtFloatHistogram()
t, fh := merged.AtFloatHistogram(nil)
actual = append(actual, fhSample{t, fh})
}
s, err := ExpandSamples(merged, nil)
@ -1629,11 +1629,11 @@ func (e errIterator) At() (int64, float64) {
return 0, 0
}
func (e errIterator) AtHistogram() (int64, *histogram.Histogram) {
func (e errIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
return 0, nil
}
func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (e errIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return 0, nil
}

View file

@ -152,10 +152,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
Value: val,
})
case chunkenc.ValHistogram:
ts, h := iter.AtHistogram()
ts, h := iter.AtHistogram(nil)
histograms = append(histograms, HistogramToHistogramProto(ts, h))
case chunkenc.ValFloatHistogram:
ts, fh := iter.AtFloatHistogram()
ts, fh := iter.AtFloatHistogram(nil)
histograms = append(histograms, FloatHistogramToHistogramProto(ts, fh))
default:
return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType)
@ -475,7 +475,7 @@ func (c *concreteSeriesIterator) At() (t int64, v float64) {
}
// AtHistogram implements chunkenc.Iterator.
func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
if c.curValType != chunkenc.ValHistogram {
panic("iterator is not on an integer histogram sample")
}
@ -484,7 +484,7 @@ func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
}
// AtFloatHistogram implements chunkenc.Iterator.
func (c *concreteSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
switch c.curValType {
case chunkenc.ValHistogram:
fh := c.series.histograms[c.histogramsCur]

View file

@ -278,31 +278,31 @@ func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) {
// Seek to the first sample with ts=1.
require.Equal(t, chunkenc.ValHistogram, it.Seek(1))
ts, v := it.AtHistogram()
ts, v := it.AtHistogram(nil)
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[0], v)
// Seek one further, next sample still has ts=1.
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, v = it.AtHistogram()
ts, v = it.AtHistogram(nil)
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[1], v)
// Seek again to 1 and make sure we stay where we are.
require.Equal(t, chunkenc.ValHistogram, it.Seek(1))
ts, v = it.AtHistogram()
ts, v = it.AtHistogram(nil)
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[1], v)
// Another seek.
require.Equal(t, chunkenc.ValHistogram, it.Seek(3))
ts, v = it.AtHistogram()
ts, v = it.AtHistogram(nil)
require.Equal(t, int64(3), ts)
require.Equal(t, histograms[3], v)
// And we don't go back.
require.Equal(t, chunkenc.ValHistogram, it.Seek(2))
ts, v = it.AtHistogram()
ts, v = it.AtHistogram(nil)
require.Equal(t, int64(3), ts)
require.Equal(t, histograms[3], v)
@ -347,12 +347,12 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
fh *histogram.FloatHistogram
)
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
ts, h = it.AtHistogram(nil)
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[0], h)
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
ts, h = it.AtHistogram(nil)
require.Equal(t, int64(2), ts)
require.Equal(t, histograms[1], h)
@ -393,13 +393,13 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
require.Equal(t, 8., v)
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
ts, h = it.AtHistogram(nil)
require.Equal(t, int64(16), ts)
require.Equal(t, histograms[10], h)
// Getting a float histogram from an int histogram works.
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, fh = it.AtFloatHistogram()
ts, fh = it.AtFloatHistogram(nil)
require.Equal(t, int64(17), ts)
expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11]))
require.Equal(t, expected, fh)

View file

@ -123,12 +123,12 @@ func (it *listSeriesIterator) At() (int64, float64) {
return s.T(), s.F()
}
func (it *listSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
func (it *listSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
s := it.samples.Get(it.idx)
return s.T(), s.H()
}
func (it *listSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (it *listSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
s := it.samples.Get(it.idx)
return s.T(), s.FH()
}
@ -337,7 +337,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
t, v = seriesIter.At()
app.Append(t, v)
case chunkenc.ValHistogram:
t, h = seriesIter.AtHistogram()
t, h = seriesIter.AtHistogram(nil)
newChk, recoded, app, err = app.AppendHistogram(nil, t, h, false)
if err != nil {
return errChunksIterator{err: err}
@ -352,7 +352,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
chk = newChk
}
case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram()
t, fh = seriesIter.AtFloatHistogram(nil)
newChk, recoded, app, err = app.AppendFloatHistogram(nil, t, fh, false)
if err != nil {
return errChunksIterator{err: err}
@ -438,10 +438,10 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64,
}
result = append(result, newSampleFn(t, f, nil, nil))
case chunkenc.ValHistogram:
t, h := iter.AtHistogram()
t, h := iter.AtHistogram(nil)
result = append(result, newSampleFn(t, 0, h, nil))
case chunkenc.ValFloatHistogram:
t, fh := iter.AtFloatHistogram()
t, fh := iter.AtFloatHistogram(nil)
result = append(result, newSampleFn(t, 0, nil, fh))
}
}

View file

@ -555,10 +555,10 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str
t, v := it.At()
ref, err = app.Append(ref, lset, t, v)
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
t, h := it.AtHistogram(nil)
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
t, fh := it.AtFloatHistogram(nil)
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
default:
err = fmt.Errorf("unknown sample type %s", typ.String())

View file

@ -131,16 +131,20 @@ type Iterator interface {
// At returns the current timestamp/value pair if the value is a float.
// Before the iterator has advanced, the behaviour is unspecified.
At() (int64, float64)
// AtHistogram returns the current timestamp/value pair if the value is
// a histogram with integer counts. Before the iterator has advanced,
// the behaviour is unspecified.
AtHistogram() (int64, *histogram.Histogram)
// AtHistogram returns the current timestamp/value pair if the value is a
// histogram with integer counts. Before the iterator has advanced, the behaviour
// is unspecified.
// The method accepts an optional Histogram object which will be
// reused when not nil. Otherwise, a new Histogram object will be allocated.
AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram)
// AtFloatHistogram returns the current timestamp/value pair if the
// value is a histogram with floating-point counts. It also works if the
// value is a histogram with integer counts, in which case a
// FloatHistogram copy of the histogram is returned. Before the iterator
// has advanced, the behaviour is unspecified.
AtFloatHistogram() (int64, *histogram.FloatHistogram)
// The method accepts an optional FloatHistogram object which will be
// reused when not nil. Otherwise, a new FloatHistogram object will be allocated.
AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram)
// AtT returns the current timestamp.
// Before the iterator has advanced, the behaviour is unspecified.
AtT() int64
@ -222,9 +226,11 @@ func (it *mockSeriesIterator) At() (int64, float64) {
return it.timeStamps[it.currIndex], it.values[it.currIndex]
}
func (it *mockSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { return math.MinInt64, nil }
func (it *mockSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
return math.MinInt64, nil
}
func (it *mockSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (it *mockSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return math.MinInt64, nil
}
@ -249,13 +255,18 @@ func NewNopIterator() Iterator {
type nopIterator struct{}
func (nopIterator) Next() ValueType { return ValNone }
func (nopIterator) Seek(int64) ValueType { return ValNone }
func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 }
func (nopIterator) AtHistogram() (int64, *histogram.Histogram) { return math.MinInt64, nil }
func (nopIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { return math.MinInt64, nil }
func (nopIterator) AtT() int64 { return math.MinInt64 }
func (nopIterator) Err() error { return nil }
func (nopIterator) Next() ValueType { return ValNone }
func (nopIterator) Seek(int64) ValueType { return ValNone }
func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 }
func (nopIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
return math.MinInt64, nil
}
func (nopIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return math.MinInt64, nil
}
func (nopIterator) AtT() int64 { return math.MinInt64 }
func (nopIterator) Err() error { return nil }
// Pool is used to create and reuse chunk references to avoid allocations.
type Pool interface {

View file

@ -527,7 +527,7 @@ func (a *FloatHistogramAppender) recode(
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
for it.Next() == ValFloatHistogram {
tOld, hOld := it.AtFloatHistogram()
tOld, hOld := it.AtFloatHistogram(nil)
// We have to newly allocate slices for the modified buckets
// here because they are kept by the appender until the next
@ -728,27 +728,50 @@ func (it *floatHistogramIterator) At() (int64, float64) {
panic("cannot call floatHistogramIterator.At")
}
func (it *floatHistogramIterator) AtHistogram() (int64, *histogram.Histogram) {
func (it *floatHistogramIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
panic("cannot call floatHistogramIterator.AtHistogram")
}
func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
if value.IsStaleNaN(it.sum.value) {
return it.t, &histogram.FloatHistogram{Sum: it.sum.value}
}
it.atFloatHistogramCalled = true
return it.t, &histogram.FloatHistogram{
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: it.cnt.value,
ZeroCount: it.zCnt.value,
Sum: it.sum.value,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
if fh == nil {
it.atFloatHistogramCalled = true
return it.t, &histogram.FloatHistogram{
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: it.cnt.value,
ZeroCount: it.zCnt.value,
Sum: it.sum.value,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
}
}
fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
fh.Schema = it.schema
fh.ZeroThreshold = it.zThreshold
fh.ZeroCount = it.zCnt.value
fh.Count = it.cnt.value
fh.Sum = it.sum.value
fh.PositiveSpans = resize(fh.PositiveSpans, len(it.pSpans))
copy(fh.PositiveSpans, it.pSpans)
fh.NegativeSpans = resize(fh.NegativeSpans, len(it.nSpans))
copy(fh.NegativeSpans, it.nSpans)
fh.PositiveBuckets = resize(fh.PositiveBuckets, len(it.pBuckets))
copy(fh.PositiveBuckets, it.pBuckets)
fh.NegativeBuckets = resize(fh.NegativeBuckets, len(it.nBuckets))
copy(fh.NegativeBuckets, it.nBuckets)
return it.t, fh
}
func (it *floatHistogramIterator) AtT() int64 {

View file

@ -140,7 +140,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
require.NoError(t, it.Err())
var act []floatResult
for it.Next() == ValFloatHistogram {
fts, fh := it.AtFloatHistogram()
fts, fh := it.AtFloatHistogram(nil)
act = append(act, floatResult{t: fts, h: fh})
}
require.NoError(t, it.Err())
@ -150,7 +150,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
it2 := c.Iterator(it)
var act2 []floatResult
for it2.Next() == ValFloatHistogram {
fts, fh := it2.AtFloatHistogram()
fts, fh := it2.AtFloatHistogram(nil)
act2 = append(act2, floatResult{t: fts, h: fh})
}
require.NoError(t, it2.Err())
@ -164,7 +164,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
it3 := c.iterator(itX)
var act3 []floatResult
for it3.Next() == ValFloatHistogram {
fts, fh := it3.AtFloatHistogram()
fts, fh := it3.AtFloatHistogram(nil)
act3 = append(act3, floatResult{t: fts, h: fh})
}
require.NoError(t, it3.Err())
@ -178,10 +178,10 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
// Below ones should not matter.
require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t))
require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t))
fts, fh := it4.AtFloatHistogram()
fts, fh := it4.AtFloatHistogram(nil)
act4 = append(act4, floatResult{t: fts, h: fh})
for it4.Next() == ValFloatHistogram {
fts, fh := it4.AtFloatHistogram()
fts, fh := it4.AtFloatHistogram(nil)
act4 = append(act4, floatResult{t: fts, h: fh})
}
require.NoError(t, it4.Err())
@ -272,7 +272,7 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
it := c.Iterator(nil)
var act []floatResult
for it.Next() == ValFloatHistogram {
fts, fh := it.AtFloatHistogram()
fts, fh := it.AtFloatHistogram(nil)
act = append(act, floatResult{t: fts, h: fh})
}
require.NoError(t, it.Err())

View file

@ -558,7 +558,7 @@ func (a *HistogramAppender) recode(
numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans)
for it.Next() == ValHistogram {
tOld, hOld := it.AtHistogram()
tOld, hOld := it.AtHistogram(nil)
// We have to newly allocate slices for the modified buckets
// here because they are kept by the appender until the next
@ -776,42 +776,96 @@ func (it *histogramIterator) At() (int64, float64) {
panic("cannot call histogramIterator.At")
}
func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) {
func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
if value.IsStaleNaN(it.sum) {
return it.t, &histogram.Histogram{Sum: it.sum}
}
it.atHistogramCalled = true
return it.t, &histogram.Histogram{
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: it.cnt,
ZeroCount: it.zCnt,
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
if h == nil {
it.atHistogramCalled = true
return it.t, &histogram.Histogram{
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: it.cnt,
ZeroCount: it.zCnt,
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
}
}
h.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
h.Schema = it.schema
h.ZeroThreshold = it.zThreshold
h.ZeroCount = it.zCnt
h.Count = it.cnt
h.Sum = it.sum
h.PositiveSpans = resize(h.PositiveSpans, len(it.pSpans))
copy(h.PositiveSpans, it.pSpans)
h.NegativeSpans = resize(h.NegativeSpans, len(it.nSpans))
copy(h.NegativeSpans, it.nSpans)
h.PositiveBuckets = resize(h.PositiveBuckets, len(it.pBuckets))
copy(h.PositiveBuckets, it.pBuckets)
h.NegativeBuckets = resize(h.NegativeBuckets, len(it.nBuckets))
copy(h.NegativeBuckets, it.nBuckets)
return it.t, h
}
func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
if value.IsStaleNaN(it.sum) {
return it.t, &histogram.FloatHistogram{Sum: it.sum}
}
it.atFloatHistogramCalled = true
return it.t, &histogram.FloatHistogram{
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: float64(it.cnt),
ZeroCount: float64(it.zCnt),
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pFloatBuckets,
NegativeBuckets: it.nFloatBuckets,
if fh == nil {
it.atFloatHistogramCalled = true
return it.t, &histogram.FloatHistogram{
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: float64(it.cnt),
ZeroCount: float64(it.zCnt),
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pFloatBuckets,
NegativeBuckets: it.nFloatBuckets,
}
}
fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
fh.Schema = it.schema
fh.ZeroThreshold = it.zThreshold
fh.ZeroCount = float64(it.zCnt)
fh.Count = float64(it.cnt)
fh.Sum = it.sum
fh.PositiveSpans = resize(fh.PositiveSpans, len(it.pSpans))
copy(fh.PositiveSpans, it.pSpans)
fh.NegativeSpans = resize(fh.NegativeSpans, len(it.nSpans))
copy(fh.NegativeSpans, it.nSpans)
fh.PositiveBuckets = resize(fh.PositiveBuckets, len(it.pBuckets))
var currentPositive float64
for i, b := range it.pBuckets {
currentPositive += float64(b)
fh.PositiveBuckets[i] = currentPositive
}
fh.NegativeBuckets = resize(fh.NegativeBuckets, len(it.nBuckets))
var currentNegative float64
for i, b := range it.nBuckets {
currentNegative += float64(b)
fh.NegativeBuckets[i] = currentNegative
}
return it.t, fh
}
func (it *histogramIterator) AtT() int64 {
@ -1056,3 +1110,10 @@ func (it *histogramIterator) readSum() bool {
}
return true
}
func resize[T any](items []T, n int) []T {
if cap(items) < n {
return make([]T, n)
}
return items[:n]
}

View file

@ -141,8 +141,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
require.NoError(t, it.Err())
var act []result
for it.Next() == ValHistogram {
ts, h := it.AtHistogram()
fts, fh := it.AtFloatHistogram()
ts, h := it.AtHistogram(nil)
fts, fh := it.AtFloatHistogram(nil)
require.Equal(t, ts, fts)
act = append(act, result{t: ts, h: h, fh: fh})
}
@ -153,8 +153,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
it2 := c.Iterator(it)
var act2 []result
for it2.Next() == ValHistogram {
ts, h := it2.AtHistogram()
fts, fh := it2.AtFloatHistogram()
ts, h := it2.AtHistogram(nil)
fts, fh := it2.AtFloatHistogram(nil)
require.Equal(t, ts, fts)
act2 = append(act2, result{t: ts, h: h, fh: fh})
}
@ -169,8 +169,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
it3 := c.iterator(itX)
var act3 []result
for it3.Next() == ValHistogram {
ts, h := it3.AtHistogram()
fts, fh := it3.AtFloatHistogram()
ts, h := it3.AtHistogram(nil)
fts, fh := it3.AtFloatHistogram(nil)
require.Equal(t, ts, fts)
act3 = append(act3, result{t: ts, h: h, fh: fh})
}
@ -185,13 +185,13 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
// Below ones should not matter.
require.Equal(t, ValHistogram, it4.Seek(exp[mid].t))
require.Equal(t, ValHistogram, it4.Seek(exp[mid].t))
ts, h = it4.AtHistogram()
fts, fh := it4.AtFloatHistogram()
ts, h = it4.AtHistogram(nil)
fts, fh := it4.AtFloatHistogram(nil)
require.Equal(t, ts, fts)
act4 = append(act4, result{t: ts, h: h, fh: fh})
for it4.Next() == ValHistogram {
ts, h := it4.AtHistogram()
fts, fh := it4.AtFloatHistogram()
ts, h := it4.AtHistogram(nil)
fts, fh := it4.AtFloatHistogram(nil)
require.Equal(t, ts, fts)
act4 = append(act4, result{t: ts, h: h, fh: fh})
}
@ -284,8 +284,8 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
it := c.Iterator(nil)
var act []result
for it.Next() == ValHistogram {
ts, h := it.AtHistogram()
fts, fh := it.AtFloatHistogram()
ts, h := it.AtHistogram(nil)
fts, fh := it.AtFloatHistogram(nil)
require.Equal(t, ts, fts)
act = append(act, result{t: ts, h: h, fh: fh})
}
@ -897,7 +897,7 @@ func TestAtFloatHistogram(t *testing.T) {
it := chk.Iterator(nil)
i := int64(0)
for it.Next() != ValNone {
ts, h := it.AtFloatHistogram()
ts, h := it.AtFloatHistogram(nil)
require.Equal(t, i, ts)
require.Equal(t, expOutput[i], h, "histogram %d unequal", i)
i++

View file

@ -260,11 +260,11 @@ func (it *xorIterator) At() (int64, float64) {
return it.t, it.val
}
func (it *xorIterator) AtHistogram() (int64, *histogram.Histogram) {
func (it *xorIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
panic("cannot call xorIterator.AtHistogram")
}
func (it *xorIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (it *xorIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
panic("cannot call xorIterator.AtFloatHistogram")
}

View file

@ -226,10 +226,10 @@ func ChunkMetasToSamples(chunks []Meta) (result []Sample) {
t, v := it.At()
result = append(result, sample{t: t, f: v})
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
t, h := it.AtHistogram(nil)
result = append(result, sample{t: t, h: h})
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
t, fh := it.AtFloatHistogram(nil)
result = append(result, sample{t: t, fh: fh})
default:
panic("unexpected value type")

View file

@ -1007,10 +1007,10 @@ func TestCompaction_populateBlock(t *testing.T) {
s.t, s.f = iter.At()
samples = append(samples, s)
case chunkenc.ValHistogram:
s.t, s.h = iter.AtHistogram()
s.t, s.h = iter.AtHistogram(nil)
samples = append(samples, s)
case chunkenc.ValFloatHistogram:
s.t, s.fh = iter.AtFloatHistogram()
s.t, s.fh = iter.AtFloatHistogram(nil)
samples = append(samples, s)
default:
require.Fail(t, "unexpected value type")

View file

@ -107,10 +107,10 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
ts, v := it.At()
samples = append(samples, sample{t: ts, f: v})
case chunkenc.ValHistogram:
ts, h := it.AtHistogram()
ts, h := it.AtHistogram(nil)
samples = append(samples, sample{t: ts, h: h})
case chunkenc.ValFloatHistogram:
ts, fh := it.AtFloatHistogram()
ts, fh := it.AtFloatHistogram(nil)
samples = append(samples, sample{t: ts, fh: fh})
default:
t.Fatalf("unknown sample type in query %s", typ.String())
@ -6664,10 +6664,10 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
ts, v := it.At()
slice = append(slice, sample{t: ts, f: v})
case chunkenc.ValHistogram:
ts, h := it.AtHistogram()
ts, h := it.AtHistogram(nil)
slice = append(slice, sample{t: ts, h: h})
case chunkenc.ValFloatHistogram:
ts, h := it.AtFloatHistogram()
ts, h := it.AtFloatHistogram(nil)
slice = append(slice, sample{t: ts, fh: h})
default:
t.Fatalf("unexpected sample value type %d", typ)

View file

@ -3377,10 +3377,10 @@ func TestAppendHistogram(t *testing.T) {
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {
case chunkenc.ValHistogram:
ts, h := it.AtHistogram()
ts, h := it.AtHistogram(nil)
actHistograms = append(actHistograms, sample{t: ts, h: h})
case chunkenc.ValFloatHistogram:
ts, fh := it.AtFloatHistogram()
ts, fh := it.AtFloatHistogram(nil)
actFloatHistograms = append(actFloatHistograms, sample{t: ts, fh: fh})
}
}
@ -4025,10 +4025,10 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
t, h := it.AtHistogram(nil)
actHistograms = append(actHistograms, timedHistogram{t: t, h: h})
case chunkenc.ValFloatHistogram:
t, h := it.AtFloatHistogram()
t, h := it.AtFloatHistogram(nil)
actHistograms = append(actHistograms, timedHistogram{t: t, fh: h})
}
}

View file

@ -820,12 +820,12 @@ func (p *populateWithDelSeriesIterator) At() (int64, float64) {
return p.curr.At()
}
func (p *populateWithDelSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
return p.curr.AtHistogram()
func (p *populateWithDelSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
return p.curr.AtHistogram(h)
}
func (p *populateWithDelSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
return p.curr.AtFloatHistogram()
func (p *populateWithDelSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return p.curr.AtFloatHistogram(fh)
}
func (p *populateWithDelSeriesIterator) AtT() int64 {
@ -937,7 +937,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
break
}
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram()
t, h = p.currDelIter.AtHistogram(nil)
_, _, app, err = app.AppendHistogram(nil, t, h, true)
if err != nil {
break
@ -968,7 +968,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
break
}
var h *histogram.FloatHistogram
t, h = p.currDelIter.AtFloatHistogram()
t, h = p.currDelIter.AtFloatHistogram(nil)
_, _, app, err = app.AppendFloatHistogram(nil, t, h, true)
if err != nil {
break
@ -1054,7 +1054,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
case chunkenc.ValHistogram:
{
var v *histogram.Histogram
t, v = p.currDelIter.AtHistogram()
t, v = p.currDelIter.AtHistogram(nil)
// No need to set prevApp as AppendHistogram will set the
// counter reset header for the appender that's returned.
newChunk, recoded, app, err = app.AppendHistogram(nil, t, v, false)
@ -1062,7 +1062,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
case chunkenc.ValFloatHistogram:
{
var v *histogram.FloatHistogram
t, v = p.currDelIter.AtFloatHistogram()
t, v = p.currDelIter.AtFloatHistogram(nil)
// No need to set prevApp as AppendHistogram will set the
// counter reset header for the appender that's returned.
newChunk, recoded, app, err = app.AppendFloatHistogram(nil, t, v, false)
@ -1233,13 +1233,13 @@ func (it *DeletedIterator) At() (int64, float64) {
return it.Iter.At()
}
func (it *DeletedIterator) AtHistogram() (int64, *histogram.Histogram) {
t, h := it.Iter.AtHistogram()
func (it *DeletedIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
t, h := it.Iter.AtHistogram(h)
return t, h
}
func (it *DeletedIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
t, h := it.Iter.AtFloatHistogram()
func (it *DeletedIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
t, h := it.Iter.AtFloatHistogram(fh)
return t, h
}

View file

@ -775,11 +775,11 @@ func (it *mockSampleIterator) At() (int64, float64) {
return it.s[it.idx].T(), it.s[it.idx].F()
}
func (it *mockSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
func (it *mockSampleIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
return it.s[it.idx].T(), it.s[it.idx].H()
}
func (it *mockSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
func (it *mockSampleIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return it.s[it.idx].T(), it.s[it.idx].FH()
}
@ -1822,12 +1822,12 @@ func checkCurrVal(t *testing.T, valType chunkenc.ValueType, it *populateWithDelS
require.Equal(t, int64(expectedTs), ts)
require.Equal(t, float64(expectedValue), v)
case chunkenc.ValHistogram:
ts, h := it.AtHistogram()
ts, h := it.AtHistogram(nil)
require.Equal(t, int64(expectedTs), ts)
h.CounterResetHint = histogram.UnknownCounterReset
require.Equal(t, tsdbutil.GenerateTestHistogram(expectedValue), h)
case chunkenc.ValFloatHistogram:
ts, h := it.AtFloatHistogram()
ts, h := it.AtFloatHistogram(nil)
require.Equal(t, int64(expectedTs), ts)
h.CounterResetHint = histogram.UnknownCounterReset
require.Equal(t, tsdbutil.GenerateTestFloatHistogram(expectedValue), h)

View file

@ -73,10 +73,10 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l
t, v := it.At()
ref, err = app.Append(ref, lset, t, v)
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
t, h := it.AtHistogram(nil)
ref, err = app.AppendHistogram(ref, lset, t, h, nil)
case chunkenc.ValFloatHistogram:
t, fh := it.AtFloatHistogram()
t, fh := it.AtFloatHistogram(nil)
ref, err = app.AppendHistogram(ref, lset, t, nil, fh)
default:
return "", fmt.Errorf("unknown sample type %s", typ.String())

View file

@ -123,7 +123,7 @@ Loop:
case chunkenc.ValFloat:
t, f = it.At()
case chunkenc.ValFloatHistogram, chunkenc.ValHistogram:
t, fh = it.AtFloatHistogram()
t, fh = it.AtFloatHistogram(nil)
default:
sample, ok := it.PeekBack(1)
if !ok {