diff --git a/rules/group.go b/rules/group.go index 9ad9aab093..a83c33a8e8 100644 --- a/rules/group.go +++ b/rules/group.go @@ -1110,9 +1110,6 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } - inputs := make(map[string][]Rule, len(rules)) - outputs := make(map[string][]Rule, len(rules)) - var indeterminate bool for _, rule := range rules { @@ -1120,26 +1117,46 @@ func buildDependencyMap(rules []Rule) dependencyMap { break } - name := rule.Name() - outputs[name] = append(outputs[name], rule) - parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error { if n, ok := node.(*parser.VectorSelector); ok { + // Find the name matcher for the rule. + var nameMatcher *labels.Matcher + if n.Name != "" { + nameMatcher = labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, n.Name) + } else { + for _, m := range n.LabelMatchers { + if m.Name == model.MetricNameLabel { + nameMatcher = m + break + } + } + } + // A wildcard metric expression means we cannot reliably determine if this rule depends on any other, // which means we cannot safely run any rules concurrently. - if n.Name == "" && len(n.LabelMatchers) > 0 { + if nameMatcher == nil { indeterminate = true return nil } // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour // if they run concurrently. - if n.Name == alertMetricName || n.Name == alertForStateMetricName { + if nameMatcher.Matches(alertMetricName) || nameMatcher.Matches(alertForStateMetricName) { indeterminate = true return nil } - inputs[n.Name] = append(inputs[n.Name], rule) + // Find rules which depend on the output of this rule. + for _, other := range rules { + if other == rule { + continue + } + + otherName := other.Name() + if nameMatcher.Matches(otherName) { + dependencies[other] = append(dependencies[other], rule) + } + } } return nil }) @@ -1149,13 +1166,5 @@ func buildDependencyMap(rules []Rule) dependencyMap { return nil } - for output, outRules := range outputs { - for _, outRule := range outRules { - if inRules, found := inputs[output]; found && len(inRules) > 0 { - dependencies[outRule] = append(dependencies[outRule], inRules...) - } - } - } - return dependencies } diff --git a/rules/manager_test.go b/rules/manager_test.go index 6c32b6d0f1..9ef1fcf074 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1601,10 +1601,14 @@ func TestDependencyMap(t *testing.T) { require.NoError(t, err) rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{}) + expr, err = parser.ParseExpr(`sum by (user) ({__name__=~"user:requests.+5m"})`) + require.NoError(t, err) + rule5 := NewRecordingRule("user:requests:sum5m", expr, labels.Labels{}) + group := NewGroup(GroupOptions{ Name: "rule_group", Interval: time.Second, - Rules: []Rule{rule, rule2, rule3, rule4}, + Rules: []Rule{rule, rule2, rule3, rule4, rule5}, Opts: opts, }) @@ -1619,13 +1623,17 @@ func TestDependencyMap(t *testing.T) { require.Equal(t, []Rule{rule}, depMap.dependencies(rule2)) require.False(t, depMap.isIndependent(rule2)) - require.Zero(t, depMap.dependents(rule3)) + require.Equal(t, []Rule{rule5}, depMap.dependents(rule3)) require.Zero(t, depMap.dependencies(rule3)) - require.True(t, depMap.isIndependent(rule3)) + require.False(t, depMap.isIndependent(rule3)) require.Zero(t, depMap.dependents(rule4)) require.Equal(t, []Rule{rule}, depMap.dependencies(rule4)) require.False(t, depMap.isIndependent(rule4)) + + require.Zero(t, depMap.dependents(rule5)) + require.Equal(t, []Rule{rule3}, depMap.dependencies(rule5)) + require.False(t, depMap.isIndependent(rule5)) } func TestNoDependency(t *testing.T) { diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 1f876f4e1e..cf4a977288 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -235,6 +235,12 @@ type DB struct { appenderPool sync.Pool bufPool sync.Pool + // These pools are used during WAL replay. + walReplaySeriesPool zeropool.Pool[[]record.RefSeries] + walReplaySamplesPool zeropool.Pool[[]record.RefSample] + walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample] + walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] + nextRef *atomic.Uint64 series *stripeSeries // deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they @@ -426,11 +432,6 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H decoded = make(chan interface{}, 10) errCh = make(chan error, 1) - - seriesPool zeropool.Pool[[]record.RefSeries] - samplesPool zeropool.Pool[[]record.RefSample] - histogramsPool zeropool.Pool[[]record.RefHistogramSample] - floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] ) go func() { @@ -440,7 +441,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H rec := r.Record() switch dec.Type(rec) { case record.Series: - series := seriesPool.Get()[:0] + series := db.walReplaySeriesPool.Get()[:0] series, err = dec.Series(rec, series) if err != nil { errCh <- &wlog.CorruptionErr{ @@ -452,7 +453,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H } decoded <- series case record.Samples: - samples := samplesPool.Get()[:0] + samples := db.walReplaySamplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { errCh <- &wlog.CorruptionErr{ @@ -464,7 +465,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H } decoded <- samples case record.HistogramSamples, record.CustomBucketsHistogramSamples: - histograms := histogramsPool.Get()[:0] + histograms := db.walReplayHistogramsPool.Get()[:0] histograms, err = dec.HistogramSamples(rec, histograms) if err != nil { errCh <- &wlog.CorruptionErr{ @@ -476,7 +477,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H } decoded <- histograms case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: - floatHistograms := floatHistogramsPool.Get()[:0] + floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0] floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) if err != nil { errCh <- &wlog.CorruptionErr{ @@ -521,7 +522,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H } } } - seriesPool.Put(v) + db.walReplaySeriesPool.Put(v) case []record.RefSample: for _, entry := range v { // Update the lastTs for the series based @@ -535,7 +536,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H series.lastTs = entry.T } } - samplesPool.Put(v) + db.walReplaySamplesPool.Put(v) case []record.RefHistogramSample: for _, entry := range v { // Update the lastTs for the series based @@ -549,7 +550,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H series.lastTs = entry.T } } - histogramsPool.Put(v) + db.walReplayHistogramsPool.Put(v) case []record.RefFloatHistogramSample: for _, entry := range v { // Update the lastTs for the series based @@ -563,7 +564,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H series.lastTs = entry.T } } - floatHistogramsPool.Put(v) + db.walReplayFloatHistogramsPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } diff --git a/tsdb/head.go b/tsdb/head.go index 4fbb4b5710..6f9a287aa0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -94,6 +94,16 @@ type Head struct { bytesPool zeropool.Pool[[]byte] memChunkPool sync.Pool + // These pools are used during WAL/WBL replay. + wlReplaySeriesPool zeropool.Pool[[]record.RefSeries] + wlReplaySamplesPool zeropool.Pool[[]record.RefSample] + wlReplaytStonesPool zeropool.Pool[[]tombstones.Stone] + wlReplayExemplarsPool zeropool.Pool[[]record.RefExemplar] + wlReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample] + wlReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] + wlReplayMetadataPool zeropool.Pool[[]record.RefMetadata] + wlReplayMmapMarkersPool zeropool.Pool[[]record.RefMmapMarker] + // All series addressable by their ID or hash. series *stripeSeries diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 33b54a756f..e498578c10 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -46,6 +46,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" @@ -440,27 +441,41 @@ func BenchmarkLoadWLs(b *testing.B) { // BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set. // BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located. +// +// Using an absolute path for BENCHMARK_LOAD_REAL_WLS_DIR is recommended. +// +// Because WLs loading may alter BENCHMARK_LOAD_REAL_WLS_DIR which can affect benchmark results and to ensure consistency, +// a copy of BENCHMARK_LOAD_REAL_WLS_DIR is made for each iteration and deleted at the end. +// Make sure there is sufficient disk space for that. func BenchmarkLoadRealWLs(b *testing.B) { - dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR") - if dir == "" { + srcDir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR") + if srcDir == "" { b.SkipNow() } - wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) - require.NoError(b, err) - b.Cleanup(func() { wal.Close() }) - - wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) - require.NoError(b, err) - b.Cleanup(func() { wbl.Close() }) - // Load the WAL. for i := 0; i < b.N; i++ { + b.StopTimer() + dir := b.TempDir() + require.NoError(b, fileutil.CopyDirs(srcDir, dir)) + + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + require.NoError(b, err) + b.Cleanup(func() { wal.Close() }) + + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + require.NoError(b, err) + b.Cleanup(func() { wbl.Close() }) + b.StartTimer() + opts := DefaultHeadOptions() opts.ChunkDirRoot = dir h, err := NewHead(nil, nil, wal, wbl, opts, nil) require.NoError(b, err) require.NoError(b, h.Init(0)) + + b.StopTimer() + require.NoError(b, os.RemoveAll(dir)) } } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 0afe84a875..ad03fa4766 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -39,7 +39,6 @@ import ( "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/wlog" - "github.com/prometheus/prometheus/util/zeropool" ) // histogramRecord combines both RefHistogramSample and RefFloatHistogramSample @@ -73,14 +72,6 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch decoded = make(chan interface{}, 10) decodeErr, seriesCreationErr error - - seriesPool zeropool.Pool[[]record.RefSeries] - samplesPool zeropool.Pool[[]record.RefSample] - tstonesPool zeropool.Pool[[]tombstones.Stone] - exemplarsPool zeropool.Pool[[]record.RefExemplar] - histogramsPool zeropool.Pool[[]record.RefHistogramSample] - floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] - metadataPool zeropool.Pool[[]record.RefMetadata] ) defer func() { @@ -140,7 +131,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch rec := r.Record() switch dec.Type(rec) { case record.Series: - series := seriesPool.Get()[:0] + series := h.wlReplaySeriesPool.Get()[:0] series, err = dec.Series(rec, series) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -152,7 +143,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decoded <- series case record.Samples: - samples := samplesPool.Get()[:0] + samples := h.wlReplaySamplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -164,7 +155,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decoded <- samples case record.Tombstones: - tstones := tstonesPool.Get()[:0] + tstones := h.wlReplaytStonesPool.Get()[:0] tstones, err = dec.Tombstones(rec, tstones) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -176,7 +167,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decoded <- tstones case record.Exemplars: - exemplars := exemplarsPool.Get()[:0] + exemplars := h.wlReplayExemplarsPool.Get()[:0] exemplars, err = dec.Exemplars(rec, exemplars) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -188,7 +179,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decoded <- exemplars case record.HistogramSamples, record.CustomBucketsHistogramSamples: - hists := histogramsPool.Get()[:0] + hists := h.wlReplayHistogramsPool.Get()[:0] hists, err = dec.HistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -200,7 +191,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decoded <- hists case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: - hists := floatHistogramsPool.Get()[:0] + hists := h.wlReplayFloatHistogramsPool.Get()[:0] hists, err = dec.FloatHistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -212,7 +203,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decoded <- hists case record.Metadata: - meta := metadataPool.Get()[:0] + meta := h.wlReplayMetadataPool.Get()[:0] meta, err := dec.Metadata(rec, meta) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -251,7 +242,7 @@ Outer: idx := uint64(mSeries.ref) % uint64(concurrency) processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries} } - seriesPool.Put(v) + h.wlReplaySeriesPool.Put(v) case []record.RefSample: samples := v minValidTime := h.minValidTime.Load() @@ -287,7 +278,7 @@ Outer: } samples = samples[m:] } - samplesPool.Put(v) + h.wlReplaySamplesPool.Put(v) case []tombstones.Stone: for _, s := range v { for _, itv := range s.Intervals { @@ -301,12 +292,12 @@ Outer: h.tombstones.AddInterval(s.Ref, itv) } } - tstonesPool.Put(v) + h.wlReplaytStonesPool.Put(v) case []record.RefExemplar: for _, e := range v { exemplarsInput <- e } - exemplarsPool.Put(v) + h.wlReplayExemplarsPool.Put(v) case []record.RefHistogramSample: samples := v minValidTime := h.minValidTime.Load() @@ -342,7 +333,7 @@ Outer: } samples = samples[m:] } - histogramsPool.Put(v) + h.wlReplayHistogramsPool.Put(v) case []record.RefFloatHistogramSample: samples := v minValidTime := h.minValidTime.Load() @@ -378,7 +369,7 @@ Outer: } samples = samples[m:] } - floatHistogramsPool.Put(v) + h.wlReplayFloatHistogramsPool.Put(v) case []record.RefMetadata: for _, m := range v { s := h.series.getByID(m.Ref) @@ -392,7 +383,7 @@ Outer: Help: m.Help, } } - metadataPool.Put(v) + h.wlReplayMetadataPool.Put(v) default: panic(fmt.Errorf("unexpected decoded type: %T", d)) } @@ -659,12 +650,8 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch shards = make([][]record.RefSample, concurrency) histogramShards = make([][]histogramRecord, concurrency) - decodedCh = make(chan interface{}, 10) - decodeErr error - samplesPool zeropool.Pool[[]record.RefSample] - markersPool zeropool.Pool[[]record.RefMmapMarker] - histogramSamplesPool zeropool.Pool[[]record.RefHistogramSample] - floatHistogramSamplesPool zeropool.Pool[[]record.RefFloatHistogramSample] + decodedCh = make(chan interface{}, 10) + decodeErr error ) defer func() { @@ -700,7 +687,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch rec := r.Record() switch dec.Type(rec) { case record.Samples: - samples := samplesPool.Get()[:0] + samples := h.wlReplaySamplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -712,7 +699,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decodedCh <- samples case record.MmapMarkers: - markers := markersPool.Get()[:0] + markers := h.wlReplayMmapMarkersPool.Get()[:0] markers, err = dec.MmapMarkers(rec, markers) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -724,7 +711,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decodedCh <- markers case record.HistogramSamples, record.CustomBucketsHistogramSamples: - hists := histogramSamplesPool.Get()[:0] + hists := h.wlReplayHistogramsPool.Get()[:0] hists, err = dec.HistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -736,7 +723,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decodedCh <- hists case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: - hists := floatHistogramSamplesPool.Get()[:0] + hists := h.wlReplayFloatHistogramsPool.Get()[:0] hists, err = dec.FloatHistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -787,7 +774,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } samples = samples[m:] } - samplesPool.Put(v) + h.wlReplaySamplesPool.Put(v) case []record.RefMmapMarker: markers := v for _, rm := range markers { @@ -842,7 +829,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } samples = samples[m:] } - histogramSamplesPool.Put(v) + h.wlReplayHistogramsPool.Put(v) case []record.RefFloatHistogramSample: samples := v // We split up the samples into chunks of 5000 samples or less. @@ -874,7 +861,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } samples = samples[m:] } - floatHistogramSamplesPool.Put(v) + h.wlReplayFloatHistogramsPool.Put(v) default: panic(fmt.Errorf("unexpected decodedCh type: %T", d)) }