SampleRingIterator: add currType field

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
This commit is contained in:
Yuri Nikolic 2024-03-01 14:49:42 +01:00
parent d5f0a240fa
commit d5ab1851dc
2 changed files with 97 additions and 19 deletions

View file

@ -316,35 +316,40 @@ func (r *sampleRing) iterator() *SampleRingIterator {
// SampleRingIterator is returned by BufferedSeriesIterator.Buffer() and can be
// used to iterate samples buffered in the lookback window.
type SampleRingIterator struct {
r *sampleRing
i int
t int64
f float64
h *histogram.Histogram
fh *histogram.FloatHistogram
r *sampleRing
i int
t int64
f float64
h *histogram.Histogram
fh *histogram.FloatHistogram
currType chunkenc.ValueType
}
func (it *SampleRingIterator) Next() chunkenc.ValueType {
it.i++
it.currType = chunkenc.ValNone
if it.i >= it.r.l {
return chunkenc.ValNone
return it.currType
}
switch it.r.bufInUse {
case fBuf:
s := it.r.atF(it.i)
it.t = s.t
it.f = s.f
return chunkenc.ValFloat
it.currType = chunkenc.ValFloat
case hBuf:
s := it.r.atH(it.i)
it.t = s.t
it.h = s.h
return chunkenc.ValHistogram
it.currType = chunkenc.ValHistogram
case fhBuf:
s := it.r.atFH(it.i)
it.t = s.t
it.fh = s.fh
return chunkenc.ValFloatHistogram
it.currType = chunkenc.ValFloatHistogram
}
if it.currType != chunkenc.ValNone {
return it.currType
}
s := it.r.at(it.i)
it.t = s.T()
@ -352,24 +357,31 @@ func (it *SampleRingIterator) Next() chunkenc.ValueType {
case chunkenc.ValHistogram:
it.h = s.H()
it.fh = nil
return chunkenc.ValHistogram
it.currType = chunkenc.ValHistogram
case chunkenc.ValFloatHistogram:
it.fh = s.FH()
it.h = nil
return chunkenc.ValFloatHistogram
it.currType = chunkenc.ValFloatHistogram
default:
it.f = s.F()
return chunkenc.ValFloat
it.currType = chunkenc.ValFloat
}
return it.currType
}
// At returns the current float element of the iterator.
func (it *SampleRingIterator) At() (int64, float64) {
if it.currType != chunkenc.ValFloat {
panic(fmt.Sprintf("SampleRingIterator: impossible to read a float, current value type is %v", it.currType))
}
return it.t, it.f
}
// AtHistogram returns the current histogram element of the iterator.
func (it *SampleRingIterator) AtHistogram() (int64, *histogram.Histogram) {
if it.currType != chunkenc.ValHistogram {
panic(fmt.Sprintf("SampleRingIterator: impossible to read a histogram, current value type is %v", it.currType))
}
return it.t, it.h
}
@ -378,14 +390,17 @@ func (it *SampleRingIterator) AtHistogram() (int64, *histogram.Histogram) {
// An optional histogram.FloatHistogram can be provided to avoid allocating a new
// object for the conversion.
func (it *SampleRingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
if it.fh == nil {
return it.t, it.h.ToFloat(fh)
if it.currType != chunkenc.ValHistogram && it.currType != chunkenc.ValFloatHistogram {
panic(fmt.Sprintf("SampleRingIterator: impossible to read a float histogram, current value type is %v", it.currType))
}
if fh != nil {
it.fh.CopyTo(fh)
return it.t, fh
if it.currType == chunkenc.ValFloatHistogram {
if fh != nil {
it.fh.CopyTo(fh)
return it.t, fh
}
return it.t, it.fh.Copy()
}
return it.t, it.fh.Copy()
return it.t, it.h.ToFloat(fh)
}
func (it *SampleRingIterator) AtT() int64 {

View file

@ -138,6 +138,69 @@ func TestSampleRingMixed(t *testing.T) {
require.Equal(t, chunkenc.ValNone, it.Next())
}
func TestSampleRingAtFloatHistogram(t *testing.T) {
fh1 := tsdbutil.GenerateTestFloatHistogram(1)
fh2 := tsdbutil.GenerateTestFloatHistogram(2)
h1 := tsdbutil.GenerateTestHistogram(3)
h2 := tsdbutil.GenerateTestHistogram(4)
// With ValNone as the preferred type, nothing should be initialized.
r := newSampleRing(10, 2, chunkenc.ValNone)
require.Zero(t, len(r.fBuf))
require.Zero(t, len(r.hBuf))
require.Zero(t, len(r.fhBuf))
require.Zero(t, len(r.iBuf))
var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ts int64
)
it := r.iterator()
require.Equal(t, chunkenc.ValNone, it.Next())
r.addFH(fhSample{t: 1, fh: fh1})
r.addFH(fhSample{t: 2, fh: fh2})
it = r.iterator()
require.Equal(t, chunkenc.ValFloatHistogram, it.Next())
ts, fh = it.AtFloatHistogram(fh)
require.Equal(t, int64(1), ts)
require.Equal(t, fh1, fh)
require.Equal(t, chunkenc.ValFloatHistogram, it.Next())
ts, fh = it.AtFloatHistogram(fh)
require.Equal(t, int64(2), ts)
require.Equal(t, fh2, fh)
require.Equal(t, chunkenc.ValNone, it.Next())
r.reset()
it = r.iterator()
require.Equal(t, chunkenc.ValNone, it.Next())
r.addH(hSample{t: 3, h: h1})
r.addH(hSample{t: 4, h: h2})
it = r.iterator()
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
require.Equal(t, int64(3), ts)
require.Equal(t, h1, h)
ts, fh = it.AtFloatHistogram(fh)
require.Equal(t, int64(3), ts)
require.Equal(t, h1.ToFloat(nil), fh)
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
require.Equal(t, int64(4), ts)
require.Equal(t, h2, h)
ts, fh = it.AtFloatHistogram(fh)
require.Equal(t, int64(4), ts)
require.Equal(t, h2.ToFloat(nil), fh)
require.Equal(t, chunkenc.ValNone, it.Next())
}
func TestBufferedSeriesIterator(t *testing.T) {
var it *BufferedSeriesIterator