From 462240bc787793dfe62c05c335f98b84401e4da0 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 9 Dec 2022 00:44:48 +0100 Subject: [PATCH] storage: add specialized buffers to sampleRing This utilizes the fact that most sampleRings will only contain samples of one type. In this case, the generic interface is circumvented, and a bespoke buffer for the one actually occurring sample type is used. Should a sampleRing receive a sample of a different kind later, it will transparently switch to the generic behavior. Signed-off-by: beorn7 --- storage/buffer.go | 288 +++++++++++++++++++++++++++++++++++------ storage/buffer_test.go | 2 +- 2 files changed, 252 insertions(+), 38 deletions(-) diff --git a/storage/buffer.go b/storage/buffer.go index 68210facc..9a9aff2af 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -44,7 +44,7 @@ func NewBuffer(delta int64) *BufferedSeriesIterator { func NewBufferIterator(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator { // TODO(codesome): based on encoding, allocate different buffer. bit := &BufferedSeriesIterator{ - buf: newSampleRing(delta, 16), + buf: newSampleRing(delta, 0, chunkenc.ValNone), delta: delta, } bit.Reset(it) @@ -121,13 +121,13 @@ func (b *BufferedSeriesIterator) Next() chunkenc.ValueType { return chunkenc.ValNone case chunkenc.ValFloat: t, f := b.it.At() - b.buf.add(fSample{t: t, f: f}) + b.buf.addF(fSample{t: t, f: f}) case chunkenc.ValHistogram: t, h := b.it.AtHistogram() - b.buf.add(hSample{t: t, h: h}) + b.buf.addH(hSample{t: t, h: h}) case chunkenc.ValFloatHistogram: t, fh := b.it.AtFloatHistogram() - b.buf.add(fhSample{t: t, fh: fh}) + b.buf.addFH(fhSample{t: t, fh: fh}) default: panic(fmt.Errorf("BufferedSeriesIterator: unknown value type %v", b.valueType)) } @@ -242,18 +242,44 @@ func (s fhSample) Type() chunkenc.ValueType { type sampleRing struct { delta int64 - buf []tsdbutil.Sample // lookback buffer - i int // position of most recent element in ring buffer - f int // position of first element in ring buffer - l int // number of elements in buffer + // Lookback buffers. We use buf for mixed samples, but one of the three + // concrete ones for homogenous samples. (Only one of the four bufs is + // allowed to be populated!) This avoids the overhead of the interface + // wrapper for the happy (and by far most common) case of homogenous + // samples. + buf []tsdbutil.Sample + fBuf []fSample + hBuf []hSample + fhBuf []fhSample + + i int // Position of most recent element in ring buffer. + f int // Position of first element in ring buffer. + l int // Number of elements in buffer. it sampleRingIterator } -func newSampleRing(delta int64, sz int) *sampleRing { - r := &sampleRing{delta: delta, buf: make([]tsdbutil.Sample, sz)} +// newSampleRing creates a new sampleRing. If you do not know the prefereed +// value type yet, use a size of 0 (in which case the provided typ doesn't +// matter). On the first add, a buffer of size 16 will be allocated with the +// preferred type being the type of the first added sample. +func newSampleRing(delta int64, size int, typ chunkenc.ValueType) *sampleRing { + r := &sampleRing{delta: delta} r.reset() - + if size <= 0 { + // Will initialize on first add. + return r + } + switch typ { + case chunkenc.ValFloat: + r.fBuf = make([]fSample, size) + case chunkenc.ValHistogram: + r.hBuf = make([]hSample, size) + case chunkenc.ValFloatHistogram: + r.fhBuf = make([]fhSample, size) + default: + r.buf = make([]tsdbutil.Sample, size) + } return r } @@ -274,7 +300,7 @@ type sampleRingIterator struct { r *sampleRing i int t int64 - v float64 + f float64 h *histogram.Histogram fh *histogram.FloatHistogram } @@ -284,6 +310,23 @@ func (it *sampleRingIterator) Next() chunkenc.ValueType { if it.i >= it.r.l { return chunkenc.ValNone } + switch { + case len(it.r.fBuf) > 0: + s := it.r.atF(it.i) + it.t = s.t + it.f = s.f + return chunkenc.ValFloat + case len(it.r.hBuf) > 0: + s := it.r.atH(it.i) + it.t = s.t + it.h = s.h + return chunkenc.ValHistogram + case len(it.r.fhBuf) > 0: + s := it.r.atFH(it.i) + it.t = s.t + it.fh = s.fh + return chunkenc.ValFloatHistogram + } s := it.r.at(it.i) it.t = s.T() switch s.Type() { @@ -294,7 +337,7 @@ func (it *sampleRingIterator) Next() chunkenc.ValueType { it.fh = s.FH() return chunkenc.ValFloatHistogram default: - it.v = s.V() + it.f = s.V() return chunkenc.ValFloat } } @@ -308,7 +351,7 @@ func (it *sampleRingIterator) Err() error { } func (it *sampleRingIterator) At() (int64, float64) { - return it.t, it.v + return it.t, it.f } func (it *sampleRingIterator) AtHistogram() (int64, *histogram.Histogram) { @@ -331,17 +374,128 @@ func (r *sampleRing) at(i int) tsdbutil.Sample { return r.buf[j] } -// add adds a sample to the ring buffer and frees all samples that fall -// out of the delta range. -func (r *sampleRing) add(s tsdbutil.Sample) { - l := len(r.buf) - // Grow the ring buffer if it fits no more elements. - if l == r.l { - buf := make([]tsdbutil.Sample, 2*l) - copy(buf[l+r.f:], r.buf[r.f:]) - copy(buf, r.buf[:r.f]) +func (r *sampleRing) atF(i int) fSample { + j := (r.f + i) % len(r.fBuf) + return r.fBuf[j] +} - r.buf = buf +func (r *sampleRing) atH(i int) hSample { + j := (r.f + i) % len(r.hBuf) + return r.hBuf[j] +} + +func (r *sampleRing) atFH(i int) fhSample { + j := (r.f + i) % len(r.fhBuf) + return r.fhBuf[j] +} + +// add adds a sample to the ring buffer and frees all samples that fall out of +// the delta range. Note that this method works for any sample +// implementation. If you know you are dealing with one of the implementations +// from this package (fSample, hSample, fhSample), call one of the specialized +// methods addF, addH, or addFH for better performance. +func (r *sampleRing) add(s tsdbutil.Sample) { + if len(r.buf) == 0 { + // Nothing added to the interface buf yet. Let's check if we can + // stay specialized. + switch s := s.(type) { + case fSample: + if len(r.hBuf)+len(r.fhBuf) == 0 { + r.fBuf = genericAdd(s, r.fBuf, r) + return + } + case hSample: + if len(r.fBuf)+len(r.fhBuf) == 0 { + r.hBuf = genericAdd(s, r.hBuf, r) + return + } + case fhSample: + if len(r.fBuf)+len(r.hBuf) == 0 { + r.fhBuf = genericAdd(s, r.fhBuf, r) + return + } + } + // The new sample isn't a fit for the already existing + // ones. Copy the latter into the interface buffer where needed. + switch { + case len(r.fBuf) > 0: + for _, s := range r.fBuf { + r.buf = append(r.buf, s) + } + r.fBuf = nil + case len(r.hBuf) > 0: + for _, s := range r.hBuf { + r.buf = append(r.buf, s) + } + r.hBuf = nil + case len(r.fhBuf) > 0: + for _, s := range r.fhBuf { + r.buf = append(r.buf, s) + } + r.fhBuf = nil + } + } + r.buf = genericAdd(s, r.buf, r) +} + +// addF is a version of the add method specialized for fSample. +func (r *sampleRing) addF(s fSample) { + switch { + case len(r.buf) > 0: + // Already have interface samples. Add to the interface buf. + r.buf = genericAdd[tsdbutil.Sample](s, r.buf, r) + case len(r.hBuf)+len(r.fhBuf) > 0: + // Already have specialized samples that are not fSamples. + // Need to call the checked add method for conversion. + r.add(s) + default: + r.fBuf = genericAdd(s, r.fBuf, r) + } +} + +// addH is a version of the add method specialized for hSample. +func (r *sampleRing) addH(s hSample) { + switch { + case len(r.buf) > 0: + // Already have interface samples. Add to the interface buf. + r.buf = genericAdd[tsdbutil.Sample](s, r.buf, r) + case len(r.fBuf)+len(r.fhBuf) > 0: + // Already have samples that are not hSamples. + // Need to call the checked add method for conversion. + r.add(s) + default: + r.hBuf = genericAdd(s, r.hBuf, r) + } +} + +// addFH is a version of the add method specialized for fhSample. +func (r *sampleRing) addFH(s fhSample) { + switch { + case len(r.buf) > 0: + // Already have interface samples. Add to the interface buf. + r.buf = genericAdd[tsdbutil.Sample](s, r.buf, r) + case len(r.fBuf)+len(r.hBuf) > 0: + // Already have samples that are not fhSamples. + // Need to call the checked add method for conversion. + r.add(s) + default: + r.fhBuf = genericAdd(s, r.fhBuf, r) + } +} + +func genericAdd[T tsdbutil.Sample](s T, buf []T, r *sampleRing) []T { + l := len(buf) + // Grow the ring buffer if it fits no more elements. + if l == 0 { + buf = make([]T, 16) + l = 16 + } + if l == r.l { + newBuf := make([]T, 2*l) + copy(newBuf[l+r.f:], buf[r.f:]) + copy(newBuf, buf[:r.f]) + + buf = newBuf r.i = r.f r.f += l l = 2 * l @@ -352,18 +506,19 @@ func (r *sampleRing) add(s tsdbutil.Sample) { } } - r.buf[r.i] = s + buf[r.i] = s r.l++ // Free head of the buffer of samples that just fell out of the range. tmin := s.T() - r.delta - for r.buf[r.f].T() < tmin { + for buf[r.f].T() < tmin { r.f++ if r.f >= l { r.f -= l } r.l-- } + return buf } // reduceDelta lowers the buffered time delta, dropping any samples that are @@ -378,17 +533,30 @@ func (r *sampleRing) reduceDelta(delta int64) bool { return true } + switch { + case len(r.fBuf) > 0: + genericReduceDelta(r.fBuf, r) + case len(r.hBuf) > 0: + genericReduceDelta(r.hBuf, r) + case len(r.fhBuf) > 0: + genericReduceDelta(r.fhBuf, r) + default: + genericReduceDelta(r.buf, r) + } + return true +} + +func genericReduceDelta[T tsdbutil.Sample](buf []T, r *sampleRing) { // Free head of the buffer of samples that just fell out of the range. - l := len(r.buf) - tmin := r.buf[r.i].T() - delta - for r.buf[r.f].T() < tmin { + l := len(buf) + tmin := buf[r.i].T() - r.delta + for buf[r.f].T() < tmin { r.f++ if r.f >= l { r.f -= l } r.l-- } - return true } // nthLast returns the nth most recent element added to the ring. @@ -396,7 +564,17 @@ func (r *sampleRing) nthLast(n int) (tsdbutil.Sample, bool) { if n > r.l { return fSample{}, false } - return r.at(r.l - n), true + i := r.l - n + switch { + case len(r.fBuf) > 0: + return r.atF(i), true + case len(r.hBuf) > 0: + return r.atH(i), true + case len(r.fhBuf) > 0: + return r.atFH(i), true + default: + return r.at(i), true + } } func (r *sampleRing) samples() []tsdbutil.Sample { @@ -404,13 +582,49 @@ func (r *sampleRing) samples() []tsdbutil.Sample { k := r.f + r.l var j int - if k > len(r.buf) { - k = len(r.buf) - j = r.l - k + r.f - } - n := copy(res, r.buf[r.f:k]) - copy(res[n:], r.buf[:j]) + switch { + case len(r.buf) > 0: + if k > len(r.buf) { + k = len(r.buf) + j = r.l - k + r.f + } + n := copy(res, r.buf[r.f:k]) + copy(res[n:], r.buf[:j]) + case len(r.fBuf) > 0: + if k > len(r.fBuf) { + k = len(r.fBuf) + j = r.l - k + r.f + } + resF := make([]fSample, r.l) + n := copy(resF, r.fBuf[r.f:k]) + copy(resF[n:], r.fBuf[:j]) + for i, s := range resF { + res[i] = s + } + case len(r.hBuf) > 0: + if k > len(r.hBuf) { + k = len(r.hBuf) + j = r.l - k + r.f + } + resH := make([]hSample, r.l) + n := copy(resH, r.hBuf[r.f:k]) + copy(resH[n:], r.hBuf[:j]) + for i, s := range resH { + res[i] = s + } + case len(r.fhBuf) > 0: + if k > len(r.fhBuf) { + k = len(r.fhBuf) + j = r.l - k + r.f + } + resFH := make([]fhSample, r.l) + n := copy(resFH, r.fhBuf[r.f:k]) + copy(resFH[n:], r.fhBuf[:j]) + for i, s := range resFH { + res[i] = s + } + } return res } diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 7cfda4d4c..2fd9e81d0 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -56,7 +56,7 @@ func TestSampleRing(t *testing.T) { }, } for _, c := range cases { - r := newSampleRing(c.delta, c.size) + r := newSampleRing(c.delta, c.size, chunkenc.ValFloat) input := []fSample{} for _, t := range c.input {