mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
storage: add specialized buffers to sampleRing
This utilizes the fact that most sampleRings will only contain samples of one type. In this case, the generic interface is circumvented, and a bespoke buffer for the one actually occurring sample type is used. Should a sampleRing receive a sample of a different kind later, it will transparently switch to the generic behavior. Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
parent
630bcb494b
commit
462240bc78
|
@ -44,7 +44,7 @@ func NewBuffer(delta int64) *BufferedSeriesIterator {
|
|||
func NewBufferIterator(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator {
|
||||
// TODO(codesome): based on encoding, allocate different buffer.
|
||||
bit := &BufferedSeriesIterator{
|
||||
buf: newSampleRing(delta, 16),
|
||||
buf: newSampleRing(delta, 0, chunkenc.ValNone),
|
||||
delta: delta,
|
||||
}
|
||||
bit.Reset(it)
|
||||
|
@ -121,13 +121,13 @@ func (b *BufferedSeriesIterator) Next() chunkenc.ValueType {
|
|||
return chunkenc.ValNone
|
||||
case chunkenc.ValFloat:
|
||||
t, f := b.it.At()
|
||||
b.buf.add(fSample{t: t, f: f})
|
||||
b.buf.addF(fSample{t: t, f: f})
|
||||
case chunkenc.ValHistogram:
|
||||
t, h := b.it.AtHistogram()
|
||||
b.buf.add(hSample{t: t, h: h})
|
||||
b.buf.addH(hSample{t: t, h: h})
|
||||
case chunkenc.ValFloatHistogram:
|
||||
t, fh := b.it.AtFloatHistogram()
|
||||
b.buf.add(fhSample{t: t, fh: fh})
|
||||
b.buf.addFH(fhSample{t: t, fh: fh})
|
||||
default:
|
||||
panic(fmt.Errorf("BufferedSeriesIterator: unknown value type %v", b.valueType))
|
||||
}
|
||||
|
@ -242,18 +242,44 @@ func (s fhSample) Type() chunkenc.ValueType {
|
|||
type sampleRing struct {
|
||||
delta int64
|
||||
|
||||
buf []tsdbutil.Sample // lookback buffer
|
||||
i int // position of most recent element in ring buffer
|
||||
f int // position of first element in ring buffer
|
||||
l int // number of elements in buffer
|
||||
// Lookback buffers. We use buf for mixed samples, but one of the three
|
||||
// concrete ones for homogenous samples. (Only one of the four bufs is
|
||||
// allowed to be populated!) This avoids the overhead of the interface
|
||||
// wrapper for the happy (and by far most common) case of homogenous
|
||||
// samples.
|
||||
buf []tsdbutil.Sample
|
||||
fBuf []fSample
|
||||
hBuf []hSample
|
||||
fhBuf []fhSample
|
||||
|
||||
i int // Position of most recent element in ring buffer.
|
||||
f int // Position of first element in ring buffer.
|
||||
l int // Number of elements in buffer.
|
||||
|
||||
it sampleRingIterator
|
||||
}
|
||||
|
||||
func newSampleRing(delta int64, sz int) *sampleRing {
|
||||
r := &sampleRing{delta: delta, buf: make([]tsdbutil.Sample, sz)}
|
||||
// newSampleRing creates a new sampleRing. If you do not know the prefereed
|
||||
// value type yet, use a size of 0 (in which case the provided typ doesn't
|
||||
// matter). On the first add, a buffer of size 16 will be allocated with the
|
||||
// preferred type being the type of the first added sample.
|
||||
func newSampleRing(delta int64, size int, typ chunkenc.ValueType) *sampleRing {
|
||||
r := &sampleRing{delta: delta}
|
||||
r.reset()
|
||||
|
||||
if size <= 0 {
|
||||
// Will initialize on first add.
|
||||
return r
|
||||
}
|
||||
switch typ {
|
||||
case chunkenc.ValFloat:
|
||||
r.fBuf = make([]fSample, size)
|
||||
case chunkenc.ValHistogram:
|
||||
r.hBuf = make([]hSample, size)
|
||||
case chunkenc.ValFloatHistogram:
|
||||
r.fhBuf = make([]fhSample, size)
|
||||
default:
|
||||
r.buf = make([]tsdbutil.Sample, size)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
|
@ -274,7 +300,7 @@ type sampleRingIterator struct {
|
|||
r *sampleRing
|
||||
i int
|
||||
t int64
|
||||
v float64
|
||||
f float64
|
||||
h *histogram.Histogram
|
||||
fh *histogram.FloatHistogram
|
||||
}
|
||||
|
@ -284,6 +310,23 @@ func (it *sampleRingIterator) Next() chunkenc.ValueType {
|
|||
if it.i >= it.r.l {
|
||||
return chunkenc.ValNone
|
||||
}
|
||||
switch {
|
||||
case len(it.r.fBuf) > 0:
|
||||
s := it.r.atF(it.i)
|
||||
it.t = s.t
|
||||
it.f = s.f
|
||||
return chunkenc.ValFloat
|
||||
case len(it.r.hBuf) > 0:
|
||||
s := it.r.atH(it.i)
|
||||
it.t = s.t
|
||||
it.h = s.h
|
||||
return chunkenc.ValHistogram
|
||||
case len(it.r.fhBuf) > 0:
|
||||
s := it.r.atFH(it.i)
|
||||
it.t = s.t
|
||||
it.fh = s.fh
|
||||
return chunkenc.ValFloatHistogram
|
||||
}
|
||||
s := it.r.at(it.i)
|
||||
it.t = s.T()
|
||||
switch s.Type() {
|
||||
|
@ -294,7 +337,7 @@ func (it *sampleRingIterator) Next() chunkenc.ValueType {
|
|||
it.fh = s.FH()
|
||||
return chunkenc.ValFloatHistogram
|
||||
default:
|
||||
it.v = s.V()
|
||||
it.f = s.V()
|
||||
return chunkenc.ValFloat
|
||||
}
|
||||
}
|
||||
|
@ -308,7 +351,7 @@ func (it *sampleRingIterator) Err() error {
|
|||
}
|
||||
|
||||
func (it *sampleRingIterator) At() (int64, float64) {
|
||||
return it.t, it.v
|
||||
return it.t, it.f
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) AtHistogram() (int64, *histogram.Histogram) {
|
||||
|
@ -331,17 +374,128 @@ func (r *sampleRing) at(i int) tsdbutil.Sample {
|
|||
return r.buf[j]
|
||||
}
|
||||
|
||||
// add adds a sample to the ring buffer and frees all samples that fall
|
||||
// out of the delta range.
|
||||
func (r *sampleRing) add(s tsdbutil.Sample) {
|
||||
l := len(r.buf)
|
||||
// Grow the ring buffer if it fits no more elements.
|
||||
if l == r.l {
|
||||
buf := make([]tsdbutil.Sample, 2*l)
|
||||
copy(buf[l+r.f:], r.buf[r.f:])
|
||||
copy(buf, r.buf[:r.f])
|
||||
func (r *sampleRing) atF(i int) fSample {
|
||||
j := (r.f + i) % len(r.fBuf)
|
||||
return r.fBuf[j]
|
||||
}
|
||||
|
||||
r.buf = buf
|
||||
func (r *sampleRing) atH(i int) hSample {
|
||||
j := (r.f + i) % len(r.hBuf)
|
||||
return r.hBuf[j]
|
||||
}
|
||||
|
||||
func (r *sampleRing) atFH(i int) fhSample {
|
||||
j := (r.f + i) % len(r.fhBuf)
|
||||
return r.fhBuf[j]
|
||||
}
|
||||
|
||||
// add adds a sample to the ring buffer and frees all samples that fall out of
|
||||
// the delta range. Note that this method works for any sample
|
||||
// implementation. If you know you are dealing with one of the implementations
|
||||
// from this package (fSample, hSample, fhSample), call one of the specialized
|
||||
// methods addF, addH, or addFH for better performance.
|
||||
func (r *sampleRing) add(s tsdbutil.Sample) {
|
||||
if len(r.buf) == 0 {
|
||||
// Nothing added to the interface buf yet. Let's check if we can
|
||||
// stay specialized.
|
||||
switch s := s.(type) {
|
||||
case fSample:
|
||||
if len(r.hBuf)+len(r.fhBuf) == 0 {
|
||||
r.fBuf = genericAdd(s, r.fBuf, r)
|
||||
return
|
||||
}
|
||||
case hSample:
|
||||
if len(r.fBuf)+len(r.fhBuf) == 0 {
|
||||
r.hBuf = genericAdd(s, r.hBuf, r)
|
||||
return
|
||||
}
|
||||
case fhSample:
|
||||
if len(r.fBuf)+len(r.hBuf) == 0 {
|
||||
r.fhBuf = genericAdd(s, r.fhBuf, r)
|
||||
return
|
||||
}
|
||||
}
|
||||
// The new sample isn't a fit for the already existing
|
||||
// ones. Copy the latter into the interface buffer where needed.
|
||||
switch {
|
||||
case len(r.fBuf) > 0:
|
||||
for _, s := range r.fBuf {
|
||||
r.buf = append(r.buf, s)
|
||||
}
|
||||
r.fBuf = nil
|
||||
case len(r.hBuf) > 0:
|
||||
for _, s := range r.hBuf {
|
||||
r.buf = append(r.buf, s)
|
||||
}
|
||||
r.hBuf = nil
|
||||
case len(r.fhBuf) > 0:
|
||||
for _, s := range r.fhBuf {
|
||||
r.buf = append(r.buf, s)
|
||||
}
|
||||
r.fhBuf = nil
|
||||
}
|
||||
}
|
||||
r.buf = genericAdd(s, r.buf, r)
|
||||
}
|
||||
|
||||
// addF is a version of the add method specialized for fSample.
|
||||
func (r *sampleRing) addF(s fSample) {
|
||||
switch {
|
||||
case len(r.buf) > 0:
|
||||
// Already have interface samples. Add to the interface buf.
|
||||
r.buf = genericAdd[tsdbutil.Sample](s, r.buf, r)
|
||||
case len(r.hBuf)+len(r.fhBuf) > 0:
|
||||
// Already have specialized samples that are not fSamples.
|
||||
// Need to call the checked add method for conversion.
|
||||
r.add(s)
|
||||
default:
|
||||
r.fBuf = genericAdd(s, r.fBuf, r)
|
||||
}
|
||||
}
|
||||
|
||||
// addH is a version of the add method specialized for hSample.
|
||||
func (r *sampleRing) addH(s hSample) {
|
||||
switch {
|
||||
case len(r.buf) > 0:
|
||||
// Already have interface samples. Add to the interface buf.
|
||||
r.buf = genericAdd[tsdbutil.Sample](s, r.buf, r)
|
||||
case len(r.fBuf)+len(r.fhBuf) > 0:
|
||||
// Already have samples that are not hSamples.
|
||||
// Need to call the checked add method for conversion.
|
||||
r.add(s)
|
||||
default:
|
||||
r.hBuf = genericAdd(s, r.hBuf, r)
|
||||
}
|
||||
}
|
||||
|
||||
// addFH is a version of the add method specialized for fhSample.
|
||||
func (r *sampleRing) addFH(s fhSample) {
|
||||
switch {
|
||||
case len(r.buf) > 0:
|
||||
// Already have interface samples. Add to the interface buf.
|
||||
r.buf = genericAdd[tsdbutil.Sample](s, r.buf, r)
|
||||
case len(r.fBuf)+len(r.hBuf) > 0:
|
||||
// Already have samples that are not fhSamples.
|
||||
// Need to call the checked add method for conversion.
|
||||
r.add(s)
|
||||
default:
|
||||
r.fhBuf = genericAdd(s, r.fhBuf, r)
|
||||
}
|
||||
}
|
||||
|
||||
func genericAdd[T tsdbutil.Sample](s T, buf []T, r *sampleRing) []T {
|
||||
l := len(buf)
|
||||
// Grow the ring buffer if it fits no more elements.
|
||||
if l == 0 {
|
||||
buf = make([]T, 16)
|
||||
l = 16
|
||||
}
|
||||
if l == r.l {
|
||||
newBuf := make([]T, 2*l)
|
||||
copy(newBuf[l+r.f:], buf[r.f:])
|
||||
copy(newBuf, buf[:r.f])
|
||||
|
||||
buf = newBuf
|
||||
r.i = r.f
|
||||
r.f += l
|
||||
l = 2 * l
|
||||
|
@ -352,18 +506,19 @@ func (r *sampleRing) add(s tsdbutil.Sample) {
|
|||
}
|
||||
}
|
||||
|
||||
r.buf[r.i] = s
|
||||
buf[r.i] = s
|
||||
r.l++
|
||||
|
||||
// Free head of the buffer of samples that just fell out of the range.
|
||||
tmin := s.T() - r.delta
|
||||
for r.buf[r.f].T() < tmin {
|
||||
for buf[r.f].T() < tmin {
|
||||
r.f++
|
||||
if r.f >= l {
|
||||
r.f -= l
|
||||
}
|
||||
r.l--
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
// reduceDelta lowers the buffered time delta, dropping any samples that are
|
||||
|
@ -378,17 +533,30 @@ func (r *sampleRing) reduceDelta(delta int64) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
switch {
|
||||
case len(r.fBuf) > 0:
|
||||
genericReduceDelta(r.fBuf, r)
|
||||
case len(r.hBuf) > 0:
|
||||
genericReduceDelta(r.hBuf, r)
|
||||
case len(r.fhBuf) > 0:
|
||||
genericReduceDelta(r.fhBuf, r)
|
||||
default:
|
||||
genericReduceDelta(r.buf, r)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func genericReduceDelta[T tsdbutil.Sample](buf []T, r *sampleRing) {
|
||||
// Free head of the buffer of samples that just fell out of the range.
|
||||
l := len(r.buf)
|
||||
tmin := r.buf[r.i].T() - delta
|
||||
for r.buf[r.f].T() < tmin {
|
||||
l := len(buf)
|
||||
tmin := buf[r.i].T() - r.delta
|
||||
for buf[r.f].T() < tmin {
|
||||
r.f++
|
||||
if r.f >= l {
|
||||
r.f -= l
|
||||
}
|
||||
r.l--
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// nthLast returns the nth most recent element added to the ring.
|
||||
|
@ -396,7 +564,17 @@ func (r *sampleRing) nthLast(n int) (tsdbutil.Sample, bool) {
|
|||
if n > r.l {
|
||||
return fSample{}, false
|
||||
}
|
||||
return r.at(r.l - n), true
|
||||
i := r.l - n
|
||||
switch {
|
||||
case len(r.fBuf) > 0:
|
||||
return r.atF(i), true
|
||||
case len(r.hBuf) > 0:
|
||||
return r.atH(i), true
|
||||
case len(r.fhBuf) > 0:
|
||||
return r.atFH(i), true
|
||||
default:
|
||||
return r.at(i), true
|
||||
}
|
||||
}
|
||||
|
||||
func (r *sampleRing) samples() []tsdbutil.Sample {
|
||||
|
@ -404,13 +582,49 @@ func (r *sampleRing) samples() []tsdbutil.Sample {
|
|||
|
||||
k := r.f + r.l
|
||||
var j int
|
||||
if k > len(r.buf) {
|
||||
k = len(r.buf)
|
||||
j = r.l - k + r.f
|
||||
}
|
||||
|
||||
n := copy(res, r.buf[r.f:k])
|
||||
copy(res[n:], r.buf[:j])
|
||||
switch {
|
||||
case len(r.buf) > 0:
|
||||
if k > len(r.buf) {
|
||||
k = len(r.buf)
|
||||
j = r.l - k + r.f
|
||||
}
|
||||
n := copy(res, r.buf[r.f:k])
|
||||
copy(res[n:], r.buf[:j])
|
||||
case len(r.fBuf) > 0:
|
||||
if k > len(r.fBuf) {
|
||||
k = len(r.fBuf)
|
||||
j = r.l - k + r.f
|
||||
}
|
||||
resF := make([]fSample, r.l)
|
||||
n := copy(resF, r.fBuf[r.f:k])
|
||||
copy(resF[n:], r.fBuf[:j])
|
||||
for i, s := range resF {
|
||||
res[i] = s
|
||||
}
|
||||
case len(r.hBuf) > 0:
|
||||
if k > len(r.hBuf) {
|
||||
k = len(r.hBuf)
|
||||
j = r.l - k + r.f
|
||||
}
|
||||
resH := make([]hSample, r.l)
|
||||
n := copy(resH, r.hBuf[r.f:k])
|
||||
copy(resH[n:], r.hBuf[:j])
|
||||
for i, s := range resH {
|
||||
res[i] = s
|
||||
}
|
||||
case len(r.fhBuf) > 0:
|
||||
if k > len(r.fhBuf) {
|
||||
k = len(r.fhBuf)
|
||||
j = r.l - k + r.f
|
||||
}
|
||||
resFH := make([]fhSample, r.l)
|
||||
n := copy(resFH, r.fhBuf[r.f:k])
|
||||
copy(resFH[n:], r.fhBuf[:j])
|
||||
for i, s := range resFH {
|
||||
res[i] = s
|
||||
}
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ func TestSampleRing(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
r := newSampleRing(c.delta, c.size)
|
||||
r := newSampleRing(c.delta, c.size, chunkenc.ValFloat)
|
||||
|
||||
input := []fSample{}
|
||||
for _, t := range c.input {
|
||||
|
|
Loading…
Reference in a new issue