mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge branch 'prometheus:main' into fix/stable-cardUI-key
This commit is contained in:
commit
dc1b95b74e
|
@ -1110,9 +1110,6 @@ func buildDependencyMap(rules []Rule) dependencyMap {
|
|||
return dependencies
|
||||
}
|
||||
|
||||
inputs := make(map[string][]Rule, len(rules))
|
||||
outputs := make(map[string][]Rule, len(rules))
|
||||
|
||||
var indeterminate bool
|
||||
|
||||
for _, rule := range rules {
|
||||
|
@ -1120,26 +1117,46 @@ func buildDependencyMap(rules []Rule) dependencyMap {
|
|||
break
|
||||
}
|
||||
|
||||
name := rule.Name()
|
||||
outputs[name] = append(outputs[name], rule)
|
||||
|
||||
parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error {
|
||||
if n, ok := node.(*parser.VectorSelector); ok {
|
||||
// Find the name matcher for the rule.
|
||||
var nameMatcher *labels.Matcher
|
||||
if n.Name != "" {
|
||||
nameMatcher = labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, n.Name)
|
||||
} else {
|
||||
for _, m := range n.LabelMatchers {
|
||||
if m.Name == model.MetricNameLabel {
|
||||
nameMatcher = m
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A wildcard metric expression means we cannot reliably determine if this rule depends on any other,
|
||||
// which means we cannot safely run any rules concurrently.
|
||||
if n.Name == "" && len(n.LabelMatchers) > 0 {
|
||||
if nameMatcher == nil {
|
||||
indeterminate = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
|
||||
// if they run concurrently.
|
||||
if n.Name == alertMetricName || n.Name == alertForStateMetricName {
|
||||
if nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName) {
|
||||
indeterminate = true
|
||||
return nil
|
||||
}
|
||||
|
||||
inputs[n.Name] = append(inputs[n.Name], rule)
|
||||
// Find rules which depend on the output of this rule.
|
||||
for _, other := range rules {
|
||||
if other == rule {
|
||||
continue
|
||||
}
|
||||
|
||||
otherName := other.Name()
|
||||
if nameMatcher.Matches(otherName) {
|
||||
dependencies[other] = append(dependencies[other], rule)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
@ -1149,13 +1166,5 @@ func buildDependencyMap(rules []Rule) dependencyMap {
|
|||
return nil
|
||||
}
|
||||
|
||||
for output, outRules := range outputs {
|
||||
for _, outRule := range outRules {
|
||||
if inRules, found := inputs[output]; found && len(inRules) > 0 {
|
||||
dependencies[outRule] = append(dependencies[outRule], inRules...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return dependencies
|
||||
}
|
||||
|
|
|
@ -1601,10 +1601,14 @@ func TestDependencyMap(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{})
|
||||
|
||||
expr, err = parser.ParseExpr(`sum by (user) ({__name__=~"user:requests.+5m"})`)
|
||||
require.NoError(t, err)
|
||||
rule5 := NewRecordingRule("user:requests:sum5m", expr, labels.Labels{})
|
||||
|
||||
group := NewGroup(GroupOptions{
|
||||
Name: "rule_group",
|
||||
Interval: time.Second,
|
||||
Rules: []Rule{rule, rule2, rule3, rule4},
|
||||
Rules: []Rule{rule, rule2, rule3, rule4, rule5},
|
||||
Opts: opts,
|
||||
})
|
||||
|
||||
|
@ -1619,13 +1623,17 @@ func TestDependencyMap(t *testing.T) {
|
|||
require.Equal(t, []Rule{rule}, depMap.dependencies(rule2))
|
||||
require.False(t, depMap.isIndependent(rule2))
|
||||
|
||||
require.Zero(t, depMap.dependents(rule3))
|
||||
require.Equal(t, []Rule{rule5}, depMap.dependents(rule3))
|
||||
require.Zero(t, depMap.dependencies(rule3))
|
||||
require.True(t, depMap.isIndependent(rule3))
|
||||
require.False(t, depMap.isIndependent(rule3))
|
||||
|
||||
require.Zero(t, depMap.dependents(rule4))
|
||||
require.Equal(t, []Rule{rule}, depMap.dependencies(rule4))
|
||||
require.False(t, depMap.isIndependent(rule4))
|
||||
|
||||
require.Zero(t, depMap.dependents(rule5))
|
||||
require.Equal(t, []Rule{rule3}, depMap.dependencies(rule5))
|
||||
require.False(t, depMap.isIndependent(rule5))
|
||||
}
|
||||
|
||||
func TestNoDependency(t *testing.T) {
|
||||
|
|
|
@ -235,6 +235,12 @@ type DB struct {
|
|||
appenderPool sync.Pool
|
||||
bufPool sync.Pool
|
||||
|
||||
// These pools are used during WAL replay.
|
||||
walReplaySeriesPool zeropool.Pool[[]record.RefSeries]
|
||||
walReplaySamplesPool zeropool.Pool[[]record.RefSample]
|
||||
walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
|
||||
nextRef *atomic.Uint64
|
||||
series *stripeSeries
|
||||
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
|
||||
|
@ -426,11 +432,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
|
||||
decoded = make(chan interface{}, 10)
|
||||
errCh = make(chan error, 1)
|
||||
|
||||
seriesPool zeropool.Pool[[]record.RefSeries]
|
||||
samplesPool zeropool.Pool[[]record.RefSample]
|
||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
)
|
||||
|
||||
go func() {
|
||||
|
@ -440,7 +441,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
rec := r.Record()
|
||||
switch dec.Type(rec) {
|
||||
case record.Series:
|
||||
series := seriesPool.Get()[:0]
|
||||
series := db.walReplaySeriesPool.Get()[:0]
|
||||
series, err = dec.Series(rec, series)
|
||||
if err != nil {
|
||||
errCh <- &wlog.CorruptionErr{
|
||||
|
@ -452,7 +453,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
}
|
||||
decoded <- series
|
||||
case record.Samples:
|
||||
samples := samplesPool.Get()[:0]
|
||||
samples := db.walReplaySamplesPool.Get()[:0]
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
if err != nil {
|
||||
errCh <- &wlog.CorruptionErr{
|
||||
|
@ -464,7 +465,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
}
|
||||
decoded <- samples
|
||||
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
|
||||
histograms := histogramsPool.Get()[:0]
|
||||
histograms := db.walReplayHistogramsPool.Get()[:0]
|
||||
histograms, err = dec.HistogramSamples(rec, histograms)
|
||||
if err != nil {
|
||||
errCh <- &wlog.CorruptionErr{
|
||||
|
@ -476,7 +477,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
}
|
||||
decoded <- histograms
|
||||
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
|
||||
floatHistograms := floatHistogramsPool.Get()[:0]
|
||||
floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0]
|
||||
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
|
||||
if err != nil {
|
||||
errCh <- &wlog.CorruptionErr{
|
||||
|
@ -521,7 +522,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
}
|
||||
}
|
||||
}
|
||||
seriesPool.Put(v)
|
||||
db.walReplaySeriesPool.Put(v)
|
||||
case []record.RefSample:
|
||||
for _, entry := range v {
|
||||
// Update the lastTs for the series based
|
||||
|
@ -535,7 +536,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
series.lastTs = entry.T
|
||||
}
|
||||
}
|
||||
samplesPool.Put(v)
|
||||
db.walReplaySamplesPool.Put(v)
|
||||
case []record.RefHistogramSample:
|
||||
for _, entry := range v {
|
||||
// Update the lastTs for the series based
|
||||
|
@ -549,7 +550,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
series.lastTs = entry.T
|
||||
}
|
||||
}
|
||||
histogramsPool.Put(v)
|
||||
db.walReplayHistogramsPool.Put(v)
|
||||
case []record.RefFloatHistogramSample:
|
||||
for _, entry := range v {
|
||||
// Update the lastTs for the series based
|
||||
|
@ -563,7 +564,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|||
series.lastTs = entry.T
|
||||
}
|
||||
}
|
||||
floatHistogramsPool.Put(v)
|
||||
db.walReplayFloatHistogramsPool.Put(v)
|
||||
default:
|
||||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||||
}
|
||||
|
|
10
tsdb/head.go
10
tsdb/head.go
|
@ -94,6 +94,16 @@ type Head struct {
|
|||
bytesPool zeropool.Pool[[]byte]
|
||||
memChunkPool sync.Pool
|
||||
|
||||
// These pools are used during WAL/WBL replay.
|
||||
wlReplaySeriesPool zeropool.Pool[[]record.RefSeries]
|
||||
wlReplaySamplesPool zeropool.Pool[[]record.RefSample]
|
||||
wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone]
|
||||
wlReplayExemplarsPool zeropool.Pool[[]record.RefExemplar]
|
||||
wlReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
wlReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
wlReplayMetadataPool zeropool.Pool[[]record.RefMetadata]
|
||||
wlReplayMmapMarkersPool zeropool.Pool[[]record.RefMmapMarker]
|
||||
|
||||
// All series addressable by their ID or hash.
|
||||
series *stripeSeries
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
|
@ -440,27 +441,41 @@ func BenchmarkLoadWLs(b *testing.B) {
|
|||
|
||||
// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set.
|
||||
// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located.
|
||||
//
|
||||
// Using an absolute path for BENCHMARK_LOAD_REAL_WLS_DIR is recommended.
|
||||
//
|
||||
// Because WLs loading may alter BENCHMARK_LOAD_REAL_WLS_DIR which can affect benchmark results and to ensure consistency,
|
||||
// a copy of BENCHMARK_LOAD_REAL_WLS_DIR is made for each iteration and deleted at the end.
|
||||
// Make sure there is sufficient disk space for that.
|
||||
func BenchmarkLoadRealWLs(b *testing.B) {
|
||||
dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
|
||||
if dir == "" {
|
||||
srcDir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR")
|
||||
if srcDir == "" {
|
||||
b.SkipNow()
|
||||
}
|
||||
|
||||
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
|
||||
require.NoError(b, err)
|
||||
b.Cleanup(func() { wal.Close() })
|
||||
|
||||
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
|
||||
require.NoError(b, err)
|
||||
b.Cleanup(func() { wbl.Close() })
|
||||
|
||||
// Load the WAL.
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
dir := b.TempDir()
|
||||
require.NoError(b, fileutil.CopyDirs(srcDir, dir))
|
||||
|
||||
wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone)
|
||||
require.NoError(b, err)
|
||||
b.Cleanup(func() { wal.Close() })
|
||||
|
||||
wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone)
|
||||
require.NoError(b, err)
|
||||
b.Cleanup(func() { wbl.Close() })
|
||||
b.StartTimer()
|
||||
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkDirRoot = dir
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(b, err)
|
||||
require.NoError(b, h.Init(0))
|
||||
|
||||
b.StopTimer()
|
||||
require.NoError(b, os.RemoveAll(dir))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
"github.com/prometheus/prometheus/util/zeropool"
|
||||
)
|
||||
|
||||
// histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
|
||||
|
@ -73,14 +72,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
|
||||
decoded = make(chan interface{}, 10)
|
||||
decodeErr, seriesCreationErr error
|
||||
|
||||
seriesPool zeropool.Pool[[]record.RefSeries]
|
||||
samplesPool zeropool.Pool[[]record.RefSample]
|
||||
tstonesPool zeropool.Pool[[]tombstones.Stone]
|
||||
exemplarsPool zeropool.Pool[[]record.RefExemplar]
|
||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||
)
|
||||
|
||||
defer func() {
|
||||
|
@ -140,7 +131,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
rec := r.Record()
|
||||
switch dec.Type(rec) {
|
||||
case record.Series:
|
||||
series := seriesPool.Get()[:0]
|
||||
series := h.wlReplaySeriesPool.Get()[:0]
|
||||
series, err = dec.Series(rec, series)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -152,7 +143,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decoded <- series
|
||||
case record.Samples:
|
||||
samples := samplesPool.Get()[:0]
|
||||
samples := h.wlReplaySamplesPool.Get()[:0]
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -164,7 +155,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decoded <- samples
|
||||
case record.Tombstones:
|
||||
tstones := tstonesPool.Get()[:0]
|
||||
tstones := h.wlReplaytStonesPool.Get()[:0]
|
||||
tstones, err = dec.Tombstones(rec, tstones)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -176,7 +167,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decoded <- tstones
|
||||
case record.Exemplars:
|
||||
exemplars := exemplarsPool.Get()[:0]
|
||||
exemplars := h.wlReplayExemplarsPool.Get()[:0]
|
||||
exemplars, err = dec.Exemplars(rec, exemplars)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -188,7 +179,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decoded <- exemplars
|
||||
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
|
||||
hists := histogramsPool.Get()[:0]
|
||||
hists := h.wlReplayHistogramsPool.Get()[:0]
|
||||
hists, err = dec.HistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -200,7 +191,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decoded <- hists
|
||||
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
|
||||
hists := floatHistogramsPool.Get()[:0]
|
||||
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
|
||||
hists, err = dec.FloatHistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -212,7 +203,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decoded <- hists
|
||||
case record.Metadata:
|
||||
meta := metadataPool.Get()[:0]
|
||||
meta := h.wlReplayMetadataPool.Get()[:0]
|
||||
meta, err := dec.Metadata(rec, meta)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -251,7 +242,7 @@ Outer:
|
|||
idx := uint64(mSeries.ref) % uint64(concurrency)
|
||||
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
|
||||
}
|
||||
seriesPool.Put(v)
|
||||
h.wlReplaySeriesPool.Put(v)
|
||||
case []record.RefSample:
|
||||
samples := v
|
||||
minValidTime := h.minValidTime.Load()
|
||||
|
@ -287,7 +278,7 @@ Outer:
|
|||
}
|
||||
samples = samples[m:]
|
||||
}
|
||||
samplesPool.Put(v)
|
||||
h.wlReplaySamplesPool.Put(v)
|
||||
case []tombstones.Stone:
|
||||
for _, s := range v {
|
||||
for _, itv := range s.Intervals {
|
||||
|
@ -301,12 +292,12 @@ Outer:
|
|||
h.tombstones.AddInterval(s.Ref, itv)
|
||||
}
|
||||
}
|
||||
tstonesPool.Put(v)
|
||||
h.wlReplaytStonesPool.Put(v)
|
||||
case []record.RefExemplar:
|
||||
for _, e := range v {
|
||||
exemplarsInput <- e
|
||||
}
|
||||
exemplarsPool.Put(v)
|
||||
h.wlReplayExemplarsPool.Put(v)
|
||||
case []record.RefHistogramSample:
|
||||
samples := v
|
||||
minValidTime := h.minValidTime.Load()
|
||||
|
@ -342,7 +333,7 @@ Outer:
|
|||
}
|
||||
samples = samples[m:]
|
||||
}
|
||||
histogramsPool.Put(v)
|
||||
h.wlReplayHistogramsPool.Put(v)
|
||||
case []record.RefFloatHistogramSample:
|
||||
samples := v
|
||||
minValidTime := h.minValidTime.Load()
|
||||
|
@ -378,7 +369,7 @@ Outer:
|
|||
}
|
||||
samples = samples[m:]
|
||||
}
|
||||
floatHistogramsPool.Put(v)
|
||||
h.wlReplayFloatHistogramsPool.Put(v)
|
||||
case []record.RefMetadata:
|
||||
for _, m := range v {
|
||||
s := h.series.getByID(m.Ref)
|
||||
|
@ -392,7 +383,7 @@ Outer:
|
|||
Help: m.Help,
|
||||
}
|
||||
}
|
||||
metadataPool.Put(v)
|
||||
h.wlReplayMetadataPool.Put(v)
|
||||
default:
|
||||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||||
}
|
||||
|
@ -659,12 +650,8 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
shards = make([][]record.RefSample, concurrency)
|
||||
histogramShards = make([][]histogramRecord, concurrency)
|
||||
|
||||
decodedCh = make(chan interface{}, 10)
|
||||
decodeErr error
|
||||
samplesPool zeropool.Pool[[]record.RefSample]
|
||||
markersPool zeropool.Pool[[]record.RefMmapMarker]
|
||||
histogramSamplesPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramSamplesPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
decodedCh = make(chan interface{}, 10)
|
||||
decodeErr error
|
||||
)
|
||||
|
||||
defer func() {
|
||||
|
@ -700,7 +687,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
rec := r.Record()
|
||||
switch dec.Type(rec) {
|
||||
case record.Samples:
|
||||
samples := samplesPool.Get()[:0]
|
||||
samples := h.wlReplaySamplesPool.Get()[:0]
|
||||
samples, err = dec.Samples(rec, samples)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -712,7 +699,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decodedCh <- samples
|
||||
case record.MmapMarkers:
|
||||
markers := markersPool.Get()[:0]
|
||||
markers := h.wlReplayMmapMarkersPool.Get()[:0]
|
||||
markers, err = dec.MmapMarkers(rec, markers)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -724,7 +711,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decodedCh <- markers
|
||||
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
|
||||
hists := histogramSamplesPool.Get()[:0]
|
||||
hists := h.wlReplayHistogramsPool.Get()[:0]
|
||||
hists, err = dec.HistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -736,7 +723,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
decodedCh <- hists
|
||||
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
|
||||
hists := floatHistogramSamplesPool.Get()[:0]
|
||||
hists := h.wlReplayFloatHistogramsPool.Get()[:0]
|
||||
hists, err = dec.FloatHistogramSamples(rec, hists)
|
||||
if err != nil {
|
||||
decodeErr = &wlog.CorruptionErr{
|
||||
|
@ -787,7 +774,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
samples = samples[m:]
|
||||
}
|
||||
samplesPool.Put(v)
|
||||
h.wlReplaySamplesPool.Put(v)
|
||||
case []record.RefMmapMarker:
|
||||
markers := v
|
||||
for _, rm := range markers {
|
||||
|
@ -842,7 +829,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
samples = samples[m:]
|
||||
}
|
||||
histogramSamplesPool.Put(v)
|
||||
h.wlReplayHistogramsPool.Put(v)
|
||||
case []record.RefFloatHistogramSample:
|
||||
samples := v
|
||||
// We split up the samples into chunks of 5000 samples or less.
|
||||
|
@ -874,7 +861,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
|||
}
|
||||
samples = samples[m:]
|
||||
}
|
||||
floatHistogramSamplesPool.Put(v)
|
||||
h.wlReplayFloatHistogramsPool.Put(v)
|
||||
default:
|
||||
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue