Merge pull request #12326 from prometheus/beorn7/storage

storage: optimise sampleRing
This commit is contained in:
Björn Rabenstein 2023-05-05 21:25:23 +02:00 committed by GitHub
commit d61811ef32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -242,15 +242,16 @@ func (s fhSample) Type() chunkenc.ValueType {
type sampleRing struct { type sampleRing struct {
delta int64 delta int64
// Lookback buffers. We use buf for mixed samples, but one of the three // Lookback buffers. We use iBuf for mixed samples, but one of the three
// concrete ones for homogenous samples. (Only one of the four bufs is // concrete ones for homogenous samples. (Only one of the four bufs is
// allowed to be populated!) This avoids the overhead of the interface // allowed to be populated!) This avoids the overhead of the interface
// wrapper for the happy (and by far most common) case of homogenous // wrapper for the happy (and by far most common) case of homogenous
// samples. // samples.
buf []tsdbutil.Sample iBuf []tsdbutil.Sample
fBuf []fSample fBuf []fSample
hBuf []hSample hBuf []hSample
fhBuf []fhSample fhBuf []fhSample
bufInUse bufType
i int // Position of most recent element in ring buffer. i int // Position of most recent element in ring buffer.
f int // Position of first element in ring buffer. f int // Position of first element in ring buffer.
@ -259,6 +260,16 @@ type sampleRing struct {
it sampleRingIterator it sampleRingIterator
} }
type bufType int
const (
noBuf bufType = iota // Nothing yet stored in sampleRing.
iBuf
fBuf
hBuf
fhBuf
)
// newSampleRing creates a new sampleRing. If you do not know the prefereed // newSampleRing creates a new sampleRing. If you do not know the prefereed
// value type yet, use a size of 0 (in which case the provided typ doesn't // value type yet, use a size of 0 (in which case the provided typ doesn't
// matter). On the first add, a buffer of size 16 will be allocated with the // matter). On the first add, a buffer of size 16 will be allocated with the
@ -278,7 +289,7 @@ func newSampleRing(delta int64, size int, typ chunkenc.ValueType) *sampleRing {
case chunkenc.ValFloatHistogram: case chunkenc.ValFloatHistogram:
r.fhBuf = make([]fhSample, size) r.fhBuf = make([]fhSample, size)
default: default:
r.buf = make([]tsdbutil.Sample, size) r.iBuf = make([]tsdbutil.Sample, size)
} }
return r return r
} }
@ -287,6 +298,7 @@ func (r *sampleRing) reset() {
r.l = 0 r.l = 0
r.i = -1 r.i = -1
r.f = 0 r.f = 0
r.bufInUse = noBuf
} }
// Returns the current iterator. Invalidates previously returned iterators. // Returns the current iterator. Invalidates previously returned iterators.
@ -310,18 +322,18 @@ func (it *sampleRingIterator) Next() chunkenc.ValueType {
if it.i >= it.r.l { if it.i >= it.r.l {
return chunkenc.ValNone return chunkenc.ValNone
} }
switch { switch it.r.bufInUse {
case len(it.r.fBuf) > 0: case fBuf:
s := it.r.atF(it.i) s := it.r.atF(it.i)
it.t = s.t it.t = s.t
it.f = s.f it.f = s.f
return chunkenc.ValFloat return chunkenc.ValFloat
case len(it.r.hBuf) > 0: case hBuf:
s := it.r.atH(it.i) s := it.r.atH(it.i)
it.t = s.t it.t = s.t
it.h = s.h it.h = s.h
return chunkenc.ValHistogram return chunkenc.ValHistogram
case len(it.r.fhBuf) > 0: case fhBuf:
s := it.r.atFH(it.i) s := it.r.atFH(it.i)
it.t = s.t it.t = s.t
it.fh = s.fh it.fh = s.fh
@ -372,8 +384,8 @@ func (it *sampleRingIterator) AtT() int64 {
} }
func (r *sampleRing) at(i int) tsdbutil.Sample { func (r *sampleRing) at(i int) tsdbutil.Sample {
j := (r.f + i) % len(r.buf) j := (r.f + i) % len(r.iBuf)
return r.buf[j] return r.iBuf[j]
} }
func (r *sampleRing) atF(i int) fSample { func (r *sampleRing) atF(i int) fSample {
@ -397,91 +409,113 @@ func (r *sampleRing) atFH(i int) fhSample {
// from this package (fSample, hSample, fhSample), call one of the specialized // from this package (fSample, hSample, fhSample), call one of the specialized
// methods addF, addH, or addFH for better performance. // methods addF, addH, or addFH for better performance.
func (r *sampleRing) add(s tsdbutil.Sample) { func (r *sampleRing) add(s tsdbutil.Sample) {
if len(r.buf) == 0 { if r.bufInUse == noBuf {
// First sample.
switch s := s.(type) {
case fSample:
r.bufInUse = fBuf
r.fBuf = addF(s, r.fBuf, r)
case hSample:
r.bufInUse = hBuf
r.hBuf = addH(s, r.hBuf, r)
case fhSample:
r.bufInUse = fhBuf
r.fhBuf = addFH(s, r.fhBuf, r)
}
return
}
if r.bufInUse != iBuf {
// Nothing added to the interface buf yet. Let's check if we can // Nothing added to the interface buf yet. Let's check if we can
// stay specialized. // stay specialized.
switch s := s.(type) { switch s := s.(type) {
case fSample: case fSample:
if len(r.hBuf)+len(r.fhBuf) == 0 { if r.bufInUse == fBuf {
r.fBuf = addF(s, r.fBuf, r) r.fBuf = addF(s, r.fBuf, r)
return return
} }
case hSample: case hSample:
if len(r.fBuf)+len(r.fhBuf) == 0 { if r.bufInUse == hBuf {
r.hBuf = addH(s, r.hBuf, r) r.hBuf = addH(s, r.hBuf, r)
return return
} }
case fhSample: case fhSample:
if len(r.fBuf)+len(r.hBuf) == 0 { if r.bufInUse == fhBuf {
r.fhBuf = addFH(s, r.fhBuf, r) r.fhBuf = addFH(s, r.fhBuf, r)
return return
} }
} }
// The new sample isn't a fit for the already existing // The new sample isn't a fit for the already existing
// ones. Copy the latter into the interface buffer where needed. // ones. Copy the latter into the interface buffer where needed.
switch { switch r.bufInUse {
case len(r.fBuf) > 0: case fBuf:
for _, s := range r.fBuf { for _, s := range r.fBuf {
r.buf = append(r.buf, s) r.iBuf = append(r.iBuf, s)
} }
r.fBuf = nil r.fBuf = nil
case len(r.hBuf) > 0: case hBuf:
for _, s := range r.hBuf { for _, s := range r.hBuf {
r.buf = append(r.buf, s) r.iBuf = append(r.iBuf, s)
} }
r.hBuf = nil r.hBuf = nil
case len(r.fhBuf) > 0: case fhBuf:
for _, s := range r.fhBuf { for _, s := range r.fhBuf {
r.buf = append(r.buf, s) r.iBuf = append(r.iBuf, s)
} }
r.fhBuf = nil r.fhBuf = nil
} }
r.bufInUse = iBuf
} }
r.buf = addSample(s, r.buf, r) r.iBuf = addSample(s, r.iBuf, r)
} }
// addF is a version of the add method specialized for fSample. // addF is a version of the add method specialized for fSample.
func (r *sampleRing) addF(s fSample) { func (r *sampleRing) addF(s fSample) {
switch { switch r.bufInUse {
case len(r.buf) > 0: case fBuf: // Add to existing fSamples.
// Already have interface samples. Add to the interface buf. r.fBuf = addF(s, r.fBuf, r)
r.buf = addSample(s, r.buf, r) case noBuf: // Add first sample.
case len(r.hBuf)+len(r.fhBuf) > 0: r.fBuf = addF(s, r.fBuf, r)
r.bufInUse = fBuf
case iBuf: // Already have interface samples. Add to the interface buf.
r.iBuf = addSample(s, r.iBuf, r)
default:
// Already have specialized samples that are not fSamples. // Already have specialized samples that are not fSamples.
// Need to call the checked add method for conversion. // Need to call the checked add method for conversion.
r.add(s) r.add(s)
default:
r.fBuf = addF(s, r.fBuf, r)
} }
} }
// addH is a version of the add method specialized for hSample. // addH is a version of the add method specialized for hSample.
func (r *sampleRing) addH(s hSample) { func (r *sampleRing) addH(s hSample) {
switch { switch r.bufInUse {
case len(r.buf) > 0: case hBuf: // Add to existing hSamples.
// Already have interface samples. Add to the interface buf. r.hBuf = addH(s, r.hBuf, r)
r.buf = addSample(s, r.buf, r) case noBuf: // Add first sample.
case len(r.fBuf)+len(r.fhBuf) > 0: r.hBuf = addH(s, r.hBuf, r)
// Already have samples that are not hSamples. r.bufInUse = hBuf
case iBuf: // Already have interface samples. Add to the interface buf.
r.iBuf = addSample(s, r.iBuf, r)
default:
// Already have specialized samples that are not hSamples.
// Need to call the checked add method for conversion. // Need to call the checked add method for conversion.
r.add(s) r.add(s)
default:
r.hBuf = addH(s, r.hBuf, r)
} }
} }
// addFH is a version of the add method specialized for fhSample. // addFH is a version of the add method specialized for fhSample.
func (r *sampleRing) addFH(s fhSample) { func (r *sampleRing) addFH(s fhSample) {
switch { switch r.bufInUse {
case len(r.buf) > 0: case fhBuf: // Add to existing fhSamples.
// Already have interface samples. Add to the interface buf. r.fhBuf = addFH(s, r.fhBuf, r)
r.buf = addSample(s, r.buf, r) case noBuf: // Add first sample.
case len(r.fBuf)+len(r.hBuf) > 0: r.fhBuf = addFH(s, r.fhBuf, r)
// Already have samples that are not fhSamples. r.bufInUse = fhBuf
case iBuf: // Already have interface samples. Add to the interface buf.
r.iBuf = addSample(s, r.iBuf, r)
default:
// Already have specialized samples that are not fhSamples.
// Need to call the checked add method for conversion. // Need to call the checked add method for conversion.
r.add(s) r.add(s)
default:
r.fhBuf = addFH(s, r.fhBuf, r)
} }
} }
@ -701,15 +735,15 @@ func (r *sampleRing) reduceDelta(delta int64) bool {
return true return true
} }
switch { switch r.bufInUse {
case len(r.fBuf) > 0: case fBuf:
genericReduceDelta(r.fBuf, r) genericReduceDelta(r.fBuf, r)
case len(r.hBuf) > 0: case hBuf:
genericReduceDelta(r.hBuf, r) genericReduceDelta(r.hBuf, r)
case len(r.fhBuf) > 0: case fhBuf:
genericReduceDelta(r.fhBuf, r) genericReduceDelta(r.fhBuf, r)
default: default:
genericReduceDelta(r.buf, r) genericReduceDelta(r.iBuf, r)
} }
return true return true
} }
@ -733,12 +767,12 @@ func (r *sampleRing) nthLast(n int) (tsdbutil.Sample, bool) {
return fSample{}, false return fSample{}, false
} }
i := r.l - n i := r.l - n
switch { switch r.bufInUse {
case len(r.fBuf) > 0: case fBuf:
return r.atF(i), true return r.atF(i), true
case len(r.hBuf) > 0: case hBuf:
return r.atH(i), true return r.atH(i), true
case len(r.fhBuf) > 0: case fhBuf:
return r.atFH(i), true return r.atFH(i), true
default: default:
return r.at(i), true return r.at(i), true
@ -751,15 +785,15 @@ func (r *sampleRing) samples() []tsdbutil.Sample {
k := r.f + r.l k := r.f + r.l
var j int var j int
switch { switch r.bufInUse {
case len(r.buf) > 0: case iBuf:
if k > len(r.buf) { if k > len(r.iBuf) {
k = len(r.buf) k = len(r.iBuf)
j = r.l - k + r.f j = r.l - k + r.f
} }
n := copy(res, r.buf[r.f:k]) n := copy(res, r.iBuf[r.f:k])
copy(res[n:], r.buf[:j]) copy(res[n:], r.iBuf[:j])
case len(r.fBuf) > 0: case fBuf:
if k > len(r.fBuf) { if k > len(r.fBuf) {
k = len(r.fBuf) k = len(r.fBuf)
j = r.l - k + r.f j = r.l - k + r.f
@ -770,7 +804,7 @@ func (r *sampleRing) samples() []tsdbutil.Sample {
for i, s := range resF { for i, s := range resF {
res[i] = s res[i] = s
} }
case len(r.hBuf) > 0: case hBuf:
if k > len(r.hBuf) { if k > len(r.hBuf) {
k = len(r.hBuf) k = len(r.hBuf)
j = r.l - k + r.f j = r.l - k + r.f
@ -781,7 +815,7 @@ func (r *sampleRing) samples() []tsdbutil.Sample {
for i, s := range resH { for i, s := range resH {
res[i] = s res[i] = s
} }
case len(r.fhBuf) > 0: case fhBuf:
if k > len(r.fhBuf) { if k > len(r.fhBuf) {
k = len(r.fhBuf) k = len(r.fhBuf)
j = r.l - k + r.f j = r.l - k + r.f