mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Use zeropool.Pool to workaround SA6002 (#12189)
* Use zeropool.Pool to workaround SA6002 I built a tiny library called https://github.com/colega/zeropool to workaround the SA6002 staticheck issue. While searching for the references of that SA6002 staticheck issues on Github first results was Prometheus itself, with quite a lot of ignores of it. This changes the usages of `sync.Pool` to `zeropool.Pool[T]` where a pointer is not available. Also added a benchmark for HeadAppender Append/Commit when series already exist, which is one of the most usual cases IMO, as I didn't find any. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Improve BenchmarkHeadAppender with more cases Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * A little copying is better than a little dependency https://www.youtube.com/watch?v=PAAkCSZUG1c&t=9m28s Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Fix imports order Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Add license header Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Copyright should be on one of the first 3 lines Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Use require.Equal for testing I don't depend on testify in my lib, but here we have it available. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Avoid flaky test Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Also use zeropool for pointsPool in engine.go Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> --------- Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
211ae4f1f0
commit
6e2905a4d4
|
@ -45,6 +45,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/util/stats"
|
"github.com/prometheus/prometheus/util/stats"
|
||||||
|
"github.com/prometheus/prometheus/util/zeropool"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -1794,18 +1795,16 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, no
|
||||||
return t, v, h, true
|
return t, v, h, true
|
||||||
}
|
}
|
||||||
|
|
||||||
var pointPool = sync.Pool{}
|
var pointPool zeropool.Pool[[]Point]
|
||||||
|
|
||||||
func getPointSlice(sz int) []Point {
|
func getPointSlice(sz int) []Point {
|
||||||
p := pointPool.Get()
|
if p := pointPool.Get(); p != nil {
|
||||||
if p != nil {
|
return p
|
||||||
return p.([]Point)
|
|
||||||
}
|
}
|
||||||
return make([]Point, 0, sz)
|
return make([]Point, 0, sz)
|
||||||
}
|
}
|
||||||
|
|
||||||
func putPointSlice(p []Point) {
|
func putPointSlice(p []Point) {
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
pointPool.Put(p[:0])
|
pointPool.Put(p[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
15
tsdb/head.go
15
tsdb/head.go
|
@ -44,6 +44,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
|
"github.com/prometheus/prometheus/util/zeropool"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -83,13 +84,13 @@ type Head struct {
|
||||||
exemplarMetrics *ExemplarMetrics
|
exemplarMetrics *ExemplarMetrics
|
||||||
exemplars ExemplarStorage
|
exemplars ExemplarStorage
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
appendPool sync.Pool
|
appendPool zeropool.Pool[[]record.RefSample]
|
||||||
exemplarsPool sync.Pool
|
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||||
histogramsPool sync.Pool
|
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||||
floatHistogramsPool sync.Pool
|
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||||
metadataPool sync.Pool
|
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||||
seriesPool sync.Pool
|
seriesPool zeropool.Pool[[]*memSeries]
|
||||||
bytesPool sync.Pool
|
bytesPool zeropool.Pool[[]byte]
|
||||||
memChunkPool sync.Pool
|
memChunkPool sync.Pool
|
||||||
|
|
||||||
// All series addressable by their ID or hash.
|
// All series addressable by their ID or hash.
|
||||||
|
|
|
@ -199,11 +199,10 @@ func (h *Head) getAppendBuffer() []record.RefSample {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]record.RefSample, 0, 512)
|
return make([]record.RefSample, 0, 512)
|
||||||
}
|
}
|
||||||
return b.([]record.RefSample)
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) putAppendBuffer(b []record.RefSample) {
|
func (h *Head) putAppendBuffer(b []record.RefSample) {
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
h.appendPool.Put(b[:0])
|
h.appendPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,7 +211,7 @@ func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]exemplarWithSeriesRef, 0, 512)
|
return make([]exemplarWithSeriesRef, 0, 512)
|
||||||
}
|
}
|
||||||
return b.([]exemplarWithSeriesRef)
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
|
func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
|
||||||
|
@ -220,7 +219,6 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
h.exemplarsPool.Put(b[:0])
|
h.exemplarsPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,11 +227,10 @@ func (h *Head) getHistogramBuffer() []record.RefHistogramSample {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]record.RefHistogramSample, 0, 512)
|
return make([]record.RefHistogramSample, 0, 512)
|
||||||
}
|
}
|
||||||
return b.([]record.RefHistogramSample)
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) putHistogramBuffer(b []record.RefHistogramSample) {
|
func (h *Head) putHistogramBuffer(b []record.RefHistogramSample) {
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
h.histogramsPool.Put(b[:0])
|
h.histogramsPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,11 +239,10 @@ func (h *Head) getFloatHistogramBuffer() []record.RefFloatHistogramSample {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]record.RefFloatHistogramSample, 0, 512)
|
return make([]record.RefFloatHistogramSample, 0, 512)
|
||||||
}
|
}
|
||||||
return b.([]record.RefFloatHistogramSample)
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) putFloatHistogramBuffer(b []record.RefFloatHistogramSample) {
|
func (h *Head) putFloatHistogramBuffer(b []record.RefFloatHistogramSample) {
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
h.floatHistogramsPool.Put(b[:0])
|
h.floatHistogramsPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,11 +251,10 @@ func (h *Head) getMetadataBuffer() []record.RefMetadata {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]record.RefMetadata, 0, 512)
|
return make([]record.RefMetadata, 0, 512)
|
||||||
}
|
}
|
||||||
return b.([]record.RefMetadata)
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) putMetadataBuffer(b []record.RefMetadata) {
|
func (h *Head) putMetadataBuffer(b []record.RefMetadata) {
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
h.metadataPool.Put(b[:0])
|
h.metadataPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,11 +263,10 @@ func (h *Head) getSeriesBuffer() []*memSeries {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]*memSeries, 0, 512)
|
return make([]*memSeries, 0, 512)
|
||||||
}
|
}
|
||||||
return b.([]*memSeries)
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) putSeriesBuffer(b []*memSeries) {
|
func (h *Head) putSeriesBuffer(b []*memSeries) {
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
h.seriesPool.Put(b[:0])
|
h.seriesPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,11 +275,10 @@ func (h *Head) getBytesBuffer() []byte {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return make([]byte, 0, 1024)
|
return make([]byte, 0, 1024)
|
||||||
}
|
}
|
||||||
return b.([]byte)
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) putBytesBuffer(b []byte) {
|
func (h *Head) putBytesBuffer(b []byte) {
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
h.bytesPool.Put(b[:0])
|
h.bytesPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,9 +79,9 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (
|
||||||
func BenchmarkCreateSeries(b *testing.B) {
|
func BenchmarkCreateSeries(b *testing.B) {
|
||||||
series := genSeries(b.N, 10, 0, 0)
|
series := genSeries(b.N, 10, 0, 0)
|
||||||
h, _ := newTestHead(b, 10000, false, false)
|
h, _ := newTestHead(b, 10000, false, false)
|
||||||
defer func() {
|
b.Cleanup(func() {
|
||||||
require.NoError(b, h.Close())
|
require.NoError(b, h.Close())
|
||||||
}()
|
})
|
||||||
|
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
@ -91,6 +91,49 @@ func BenchmarkCreateSeries(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) {
|
||||||
|
seriesCounts := []int{100, 1000, 10000}
|
||||||
|
series := genSeries(10000, 10, 0, 0)
|
||||||
|
|
||||||
|
for _, seriesCount := range seriesCounts {
|
||||||
|
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
|
||||||
|
for _, samplesPerAppend := range []int64{1, 2, 5, 100} {
|
||||||
|
b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) {
|
||||||
|
h, _ := newTestHead(b, 10000, false, false)
|
||||||
|
b.Cleanup(func() { require.NoError(b, h.Close()) })
|
||||||
|
|
||||||
|
ts := int64(1000)
|
||||||
|
append := func() error {
|
||||||
|
var err error
|
||||||
|
app := h.Appender(context.Background())
|
||||||
|
for _, s := range series[:seriesCount] {
|
||||||
|
var ref storage.SeriesRef
|
||||||
|
for sampleIndex := int64(0); sampleIndex < samplesPerAppend; sampleIndex++ {
|
||||||
|
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ts += 1000 // should increment more than highest samplesPerAppend
|
||||||
|
return app.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init series, that's not what we're benchmarking here.
|
||||||
|
require.NoError(b, append())
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
require.NoError(b, append())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) {
|
func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) {
|
||||||
var enc record.Encoder
|
var enc record.Encoder
|
||||||
for _, r := range recs {
|
for _, r := range recs {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/record"
|
"github.com/prometheus/prometheus/tsdb/record"
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
|
"github.com/prometheus/prometheus/util/zeropool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
|
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
|
||||||
|
@ -74,41 +75,14 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
|
|
||||||
decoded = make(chan interface{}, 10)
|
decoded = make(chan interface{}, 10)
|
||||||
decodeErr, seriesCreationErr error
|
decodeErr, seriesCreationErr error
|
||||||
seriesPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
seriesPool zeropool.Pool[[]record.RefSeries]
|
||||||
return []record.RefSeries{}
|
samplesPool zeropool.Pool[[]record.RefSample]
|
||||||
},
|
tstonesPool zeropool.Pool[[]tombstones.Stone]
|
||||||
}
|
exemplarsPool zeropool.Pool[[]record.RefExemplar]
|
||||||
samplesPool = sync.Pool{
|
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||||
New: func() interface{} {
|
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||||
return []record.RefSample{}
|
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||||
},
|
|
||||||
}
|
|
||||||
tstonesPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return []tombstones.Stone{}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
exemplarsPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return []record.RefExemplar{}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
histogramsPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return []record.RefHistogramSample{}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
floatHistogramsPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return []record.RefFloatHistogramSample{}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
metadataPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return []record.RefMetadata{}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -167,7 +141,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
rec := r.Record()
|
rec := r.Record()
|
||||||
switch dec.Type(rec) {
|
switch dec.Type(rec) {
|
||||||
case record.Series:
|
case record.Series:
|
||||||
series := seriesPool.Get().([]record.RefSeries)[:0]
|
series := seriesPool.Get()[:0]
|
||||||
series, err = dec.Series(rec, series)
|
series, err = dec.Series(rec, series)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decodeErr = &wlog.CorruptionErr{
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
@ -179,7 +153,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
}
|
}
|
||||||
decoded <- series
|
decoded <- series
|
||||||
case record.Samples:
|
case record.Samples:
|
||||||
samples := samplesPool.Get().([]record.RefSample)[:0]
|
samples := samplesPool.Get()[:0]
|
||||||
samples, err = dec.Samples(rec, samples)
|
samples, err = dec.Samples(rec, samples)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decodeErr = &wlog.CorruptionErr{
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
@ -191,7 +165,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
}
|
}
|
||||||
decoded <- samples
|
decoded <- samples
|
||||||
case record.Tombstones:
|
case record.Tombstones:
|
||||||
tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
|
tstones := tstonesPool.Get()[:0]
|
||||||
tstones, err = dec.Tombstones(rec, tstones)
|
tstones, err = dec.Tombstones(rec, tstones)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decodeErr = &wlog.CorruptionErr{
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
@ -203,7 +177,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
}
|
}
|
||||||
decoded <- tstones
|
decoded <- tstones
|
||||||
case record.Exemplars:
|
case record.Exemplars:
|
||||||
exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0]
|
exemplars := exemplarsPool.Get()[:0]
|
||||||
exemplars, err = dec.Exemplars(rec, exemplars)
|
exemplars, err = dec.Exemplars(rec, exemplars)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decodeErr = &wlog.CorruptionErr{
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
@ -215,7 +189,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
}
|
}
|
||||||
decoded <- exemplars
|
decoded <- exemplars
|
||||||
case record.HistogramSamples:
|
case record.HistogramSamples:
|
||||||
hists := histogramsPool.Get().([]record.RefHistogramSample)[:0]
|
hists := histogramsPool.Get()[:0]
|
||||||
hists, err = dec.HistogramSamples(rec, hists)
|
hists, err = dec.HistogramSamples(rec, hists)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decodeErr = &wlog.CorruptionErr{
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
@ -227,7 +201,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
}
|
}
|
||||||
decoded <- hists
|
decoded <- hists
|
||||||
case record.FloatHistogramSamples:
|
case record.FloatHistogramSamples:
|
||||||
hists := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
|
hists := floatHistogramsPool.Get()[:0]
|
||||||
hists, err = dec.FloatHistogramSamples(rec, hists)
|
hists, err = dec.FloatHistogramSamples(rec, hists)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decodeErr = &wlog.CorruptionErr{
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
@ -239,7 +213,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
}
|
}
|
||||||
decoded <- hists
|
decoded <- hists
|
||||||
case record.Metadata:
|
case record.Metadata:
|
||||||
meta := metadataPool.Get().([]record.RefMetadata)[:0]
|
meta := metadataPool.Get()[:0]
|
||||||
meta, err := dec.Metadata(rec, meta)
|
meta, err := dec.Metadata(rec, meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decodeErr = &wlog.CorruptionErr{
|
decodeErr = &wlog.CorruptionErr{
|
||||||
|
@ -278,7 +252,6 @@ Outer:
|
||||||
idx := uint64(mSeries.ref) % uint64(concurrency)
|
idx := uint64(mSeries.ref) % uint64(concurrency)
|
||||||
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
|
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
seriesPool.Put(v)
|
seriesPool.Put(v)
|
||||||
case []record.RefSample:
|
case []record.RefSample:
|
||||||
samples := v
|
samples := v
|
||||||
|
@ -315,7 +288,6 @@ Outer:
|
||||||
}
|
}
|
||||||
samples = samples[m:]
|
samples = samples[m:]
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
samplesPool.Put(v)
|
samplesPool.Put(v)
|
||||||
case []tombstones.Stone:
|
case []tombstones.Stone:
|
||||||
for _, s := range v {
|
for _, s := range v {
|
||||||
|
@ -330,13 +302,11 @@ Outer:
|
||||||
h.tombstones.AddInterval(storage.SeriesRef(s.Ref), itv)
|
h.tombstones.AddInterval(storage.SeriesRef(s.Ref), itv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
tstonesPool.Put(v)
|
tstonesPool.Put(v)
|
||||||
case []record.RefExemplar:
|
case []record.RefExemplar:
|
||||||
for _, e := range v {
|
for _, e := range v {
|
||||||
exemplarsInput <- e
|
exemplarsInput <- e
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
exemplarsPool.Put(v)
|
exemplarsPool.Put(v)
|
||||||
case []record.RefHistogramSample:
|
case []record.RefHistogramSample:
|
||||||
samples := v
|
samples := v
|
||||||
|
@ -373,7 +343,6 @@ Outer:
|
||||||
}
|
}
|
||||||
samples = samples[m:]
|
samples = samples[m:]
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
histogramsPool.Put(v)
|
histogramsPool.Put(v)
|
||||||
case []record.RefFloatHistogramSample:
|
case []record.RefFloatHistogramSample:
|
||||||
samples := v
|
samples := v
|
||||||
|
@ -410,7 +379,6 @@ Outer:
|
||||||
}
|
}
|
||||||
samples = samples[m:]
|
samples = samples[m:]
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
floatHistogramsPool.Put(v)
|
floatHistogramsPool.Put(v)
|
||||||
case []record.RefMetadata:
|
case []record.RefMetadata:
|
||||||
for _, m := range v {
|
for _, m := range v {
|
||||||
|
@ -425,7 +393,6 @@ Outer:
|
||||||
Help: m.Help,
|
Help: m.Help,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
metadataPool.Put(v)
|
metadataPool.Put(v)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||||||
|
@ -793,7 +760,6 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
|
||||||
}
|
}
|
||||||
samples = samples[m:]
|
samples = samples[m:]
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
|
|
||||||
samplesPool.Put(d)
|
samplesPool.Put(d)
|
||||||
case []record.RefMmapMarker:
|
case []record.RefMmapMarker:
|
||||||
markers := v
|
markers := v
|
||||||
|
|
28
tsdb/wal.go
28
tsdb/wal.go
|
@ -38,6 +38,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/tsdb/record"
|
"github.com/prometheus/prometheus/tsdb/record"
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
|
"github.com/prometheus/prometheus/util/zeropool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WALEntryType indicates what data a WAL entry contains.
|
// WALEntryType indicates what data a WAL entry contains.
|
||||||
|
@ -870,9 +871,9 @@ func (r *walReader) Read(
|
||||||
// Historically, the processing is the bottleneck with reading and decoding using only
|
// Historically, the processing is the bottleneck with reading and decoding using only
|
||||||
// 15% of the CPU.
|
// 15% of the CPU.
|
||||||
var (
|
var (
|
||||||
seriesPool sync.Pool
|
seriesPool zeropool.Pool[[]record.RefSeries]
|
||||||
samplePool sync.Pool
|
samplePool zeropool.Pool[[]record.RefSample]
|
||||||
deletePool sync.Pool
|
deletePool zeropool.Pool[[]tombstones.Stone]
|
||||||
)
|
)
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
datac := make(chan interface{}, 100)
|
datac := make(chan interface{}, 100)
|
||||||
|
@ -886,19 +887,16 @@ func (r *walReader) Read(
|
||||||
if seriesf != nil {
|
if seriesf != nil {
|
||||||
seriesf(v)
|
seriesf(v)
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
seriesPool.Put(v[:0])
|
seriesPool.Put(v[:0])
|
||||||
case []record.RefSample:
|
case []record.RefSample:
|
||||||
if samplesf != nil {
|
if samplesf != nil {
|
||||||
samplesf(v)
|
samplesf(v)
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
samplePool.Put(v[:0])
|
samplePool.Put(v[:0])
|
||||||
case []tombstones.Stone:
|
case []tombstones.Stone:
|
||||||
if deletesf != nil {
|
if deletesf != nil {
|
||||||
deletesf(v)
|
deletesf(v)
|
||||||
}
|
}
|
||||||
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
|
|
||||||
deletePool.Put(v[:0])
|
deletePool.Put(v[:0])
|
||||||
default:
|
default:
|
||||||
level.Error(r.logger).Log("msg", "unexpected data type")
|
level.Error(r.logger).Log("msg", "unexpected data type")
|
||||||
|
@ -915,11 +913,9 @@ func (r *walReader) Read(
|
||||||
// Those should generally be caught by entry decoding before.
|
// Those should generally be caught by entry decoding before.
|
||||||
switch et {
|
switch et {
|
||||||
case WALEntrySeries:
|
case WALEntrySeries:
|
||||||
var series []record.RefSeries
|
series := seriesPool.Get()
|
||||||
if v := seriesPool.Get(); v == nil {
|
if series == nil {
|
||||||
series = make([]record.RefSeries, 0, 512)
|
series = make([]record.RefSeries, 0, 512)
|
||||||
} else {
|
|
||||||
series = v.([]record.RefSeries)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.decodeSeries(flag, b, &series)
|
err = r.decodeSeries(flag, b, &series)
|
||||||
|
@ -936,11 +932,9 @@ func (r *walReader) Read(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case WALEntrySamples:
|
case WALEntrySamples:
|
||||||
var samples []record.RefSample
|
samples := samplePool.Get()
|
||||||
if v := samplePool.Get(); v == nil {
|
if samples == nil {
|
||||||
samples = make([]record.RefSample, 0, 512)
|
samples = make([]record.RefSample, 0, 512)
|
||||||
} else {
|
|
||||||
samples = v.([]record.RefSample)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.decodeSamples(flag, b, &samples)
|
err = r.decodeSamples(flag, b, &samples)
|
||||||
|
@ -958,11 +952,9 @@ func (r *walReader) Read(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case WALEntryDeletes:
|
case WALEntryDeletes:
|
||||||
var deletes []tombstones.Stone
|
deletes := deletePool.Get()
|
||||||
if v := deletePool.Get(); v == nil {
|
if deletes == nil {
|
||||||
deletes = make([]tombstones.Stone, 0, 512)
|
deletes = make([]tombstones.Stone, 0, 512)
|
||||||
} else {
|
|
||||||
deletes = v.([]tombstones.Stone)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.decodeDeletes(flag, b, &deletes)
|
err = r.decodeDeletes(flag, b, &deletes)
|
||||||
|
|
77
util/zeropool/pool.go
Normal file
77
util/zeropool/pool.go
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
// Copyright 2023 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
// Package zeropool provides a zero-allocation type-safe alternative for sync.Pool, used to workaround staticheck SA6002.
|
||||||
|
// The contents of this package are brought from https://github.com/colega/zeropool because "little copying is better than little dependency".
|
||||||
|
|
||||||
|
package zeropool
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// Pool is a type-safe pool of items that does not allocate pointers to items.
|
||||||
|
// That is not entirely true, it does allocate sometimes, but not most of the time,
|
||||||
|
// just like the usual sync.Pool pools items most of the time, except when they're evicted.
|
||||||
|
// It does that by storing the allocated pointers in a secondary pool instead of letting them go,
|
||||||
|
// so they can be used later to store the items again.
|
||||||
|
//
|
||||||
|
// Zero value of Pool[T] is valid, and it will return zero values of T if nothing is pooled.
|
||||||
|
type Pool[T any] struct {
|
||||||
|
// items holds pointers to the pooled items, which are valid to be used.
|
||||||
|
items sync.Pool
|
||||||
|
// pointers holds just pointers to the pooled item types.
|
||||||
|
// The values referenced by pointers are not valid to be used (as they're used by some other caller)
|
||||||
|
// and it is safe to overwrite these pointers.
|
||||||
|
pointers sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new Pool[T] with the given function to create new items.
|
||||||
|
// A Pool must not be copied after first use.
|
||||||
|
func New[T any](item func() T) Pool[T] {
|
||||||
|
return Pool[T]{
|
||||||
|
items: sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
val := item()
|
||||||
|
return &val
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns an item from the pool, creating a new one if necessary.
|
||||||
|
// Get may be called concurrently from multiple goroutines.
|
||||||
|
func (p *Pool[T]) Get() T {
|
||||||
|
pooled := p.items.Get()
|
||||||
|
if pooled == nil {
|
||||||
|
// The only way this can happen is when someone is using the zero-value of zeropool.Pool, and items pool is empty.
|
||||||
|
// We don't have a pointer to store in p.pointers, so just return the empty value.
|
||||||
|
var zero T
|
||||||
|
return zero
|
||||||
|
}
|
||||||
|
|
||||||
|
ptr := pooled.(*T)
|
||||||
|
item := *ptr // ptr still holds a reference to a copy of item, but nobody will use it.
|
||||||
|
p.pointers.Put(ptr)
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put adds an item to the pool.
|
||||||
|
func (p *Pool[T]) Put(item T) {
|
||||||
|
var ptr *T
|
||||||
|
if pooled := p.pointers.Get(); pooled != nil {
|
||||||
|
ptr = pooled.(*T)
|
||||||
|
} else {
|
||||||
|
ptr = new(T)
|
||||||
|
}
|
||||||
|
*ptr = item
|
||||||
|
p.items.Put(ptr)
|
||||||
|
}
|
178
util/zeropool/pool_test.go
Normal file
178
util/zeropool/pool_test.go
Normal file
|
@ -0,0 +1,178 @@
|
||||||
|
// Copyright 2023 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package zeropool_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/util/zeropool"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPool(t *testing.T) {
|
||||||
|
t.Run("provides correct values", func(t *testing.T) {
|
||||||
|
pool := zeropool.New(func() []byte { return make([]byte, 1024) })
|
||||||
|
item1 := pool.Get()
|
||||||
|
require.Equal(t, 1024, len(item1))
|
||||||
|
|
||||||
|
item2 := pool.Get()
|
||||||
|
require.Equal(t, 1024, len(item2))
|
||||||
|
|
||||||
|
pool.Put(item1)
|
||||||
|
pool.Put(item2)
|
||||||
|
|
||||||
|
item1 = pool.Get()
|
||||||
|
require.Equal(t, 1024, len(item1))
|
||||||
|
|
||||||
|
item2 = pool.Get()
|
||||||
|
require.Equal(t, 1024, len(item2))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("is not racy", func(t *testing.T) {
|
||||||
|
pool := zeropool.New(func() []byte { return make([]byte, 1024) })
|
||||||
|
|
||||||
|
const iterations = 1e6
|
||||||
|
const concurrency = math.MaxUint8
|
||||||
|
var counter atomic.Int64
|
||||||
|
|
||||||
|
do := make(chan struct{}, 1e6)
|
||||||
|
for i := 0; i < iterations; i++ {
|
||||||
|
do <- struct{}{}
|
||||||
|
}
|
||||||
|
close(do)
|
||||||
|
|
||||||
|
run := make(chan struct{})
|
||||||
|
done := sync.WaitGroup{}
|
||||||
|
done.Add(concurrency)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func(worker int) {
|
||||||
|
<-run
|
||||||
|
for range do {
|
||||||
|
item := pool.Get()
|
||||||
|
item[0] = byte(worker)
|
||||||
|
counter.Add(1) // Counts and also adds some delay to add raciness.
|
||||||
|
if item[0] != byte(worker) {
|
||||||
|
panic("wrong value")
|
||||||
|
}
|
||||||
|
pool.Put(item)
|
||||||
|
}
|
||||||
|
done.Done()
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
close(run)
|
||||||
|
done.Wait()
|
||||||
|
t.Logf("Done %d iterations", counter.Load())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("does not allocate", func(t *testing.T) {
|
||||||
|
pool := zeropool.New(func() []byte { return make([]byte, 1024) })
|
||||||
|
// Warm up, this will alloate one slice.
|
||||||
|
slice := pool.Get()
|
||||||
|
pool.Put(slice)
|
||||||
|
|
||||||
|
allocs := testing.AllocsPerRun(1000, func() {
|
||||||
|
slice := pool.Get()
|
||||||
|
pool.Put(slice)
|
||||||
|
})
|
||||||
|
// Don't compare to 0, as when passing all the tests the GC could flush the pools during this test and we would allocate.
|
||||||
|
// Just check that it's less than 1 on average, which is mostly the same thing.
|
||||||
|
require.Less(t, allocs, 1., "Should not allocate.")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("zero value is valid", func(t *testing.T) {
|
||||||
|
var pool zeropool.Pool[[]byte]
|
||||||
|
slice := pool.Get()
|
||||||
|
pool.Put(slice)
|
||||||
|
|
||||||
|
allocs := testing.AllocsPerRun(1000, func() {
|
||||||
|
slice := pool.Get()
|
||||||
|
pool.Put(slice)
|
||||||
|
})
|
||||||
|
// Don't compare to 0, as when passing all the tests the GC could flush the pools during this test and we would allocate.
|
||||||
|
// Just check that it's less than 1 on average, which is mostly the same thing.
|
||||||
|
require.Less(t, allocs, 1., "Should not allocate.")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkZeropoolPool(b *testing.B) {
|
||||||
|
pool := zeropool.New(func() []byte { return make([]byte, 1024) })
|
||||||
|
|
||||||
|
// Warmup
|
||||||
|
item := pool.Get()
|
||||||
|
pool.Put(item)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
item := pool.Get()
|
||||||
|
pool.Put(item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkSyncPoolValue uses sync.Pool to store values, which makes an allocation on each Put call.
|
||||||
|
func BenchmarkSyncPoolValue(b *testing.B) {
|
||||||
|
pool := sync.Pool{New: func() any {
|
||||||
|
return make([]byte, 1024)
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Warmup
|
||||||
|
item := pool.Get().([]byte)
|
||||||
|
pool.Put(item) //nolint:staticcheck // This allocates.
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
item := pool.Get().([]byte)
|
||||||
|
pool.Put(item) //nolint:staticcheck // This allocates.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkSyncPoolNewPointer uses sync.Pool to store pointers, but it calls Put with a new pointer every time.
|
||||||
|
func BenchmarkSyncPoolNewPointer(b *testing.B) {
|
||||||
|
pool := sync.Pool{New: func() any {
|
||||||
|
v := make([]byte, 1024)
|
||||||
|
return &v
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Warmup
|
||||||
|
item := pool.Get().(*[]byte)
|
||||||
|
pool.Put(item) //nolint:staticcheck // This allocates.
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
item := pool.Get().(*[]byte)
|
||||||
|
buf := *item
|
||||||
|
pool.Put(&buf) //nolint:staticcheck // New pointer.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkSyncPoolPointer illustrates the optimal usage of sync.Pool, not always possible.
|
||||||
|
func BenchmarkSyncPoolPointer(b *testing.B) {
|
||||||
|
pool := sync.Pool{New: func() any {
|
||||||
|
v := make([]byte, 1024)
|
||||||
|
return &v
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Warmup
|
||||||
|
item := pool.Get().(*[]byte)
|
||||||
|
pool.Put(item)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
item := pool.Get().(*[]byte)
|
||||||
|
pool.Put(item)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue