mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Merge pull request #14328 from bboreham/more-dedupelabels
TSDB and scraping: improvements to dedupelabels
This commit is contained in:
commit
89608c69a7
|
@ -34,6 +34,7 @@ type scrapeMetrics struct {
|
||||||
targetScrapePoolExceededTargetLimit prometheus.Counter
|
targetScrapePoolExceededTargetLimit prometheus.Counter
|
||||||
targetScrapePoolTargetLimit *prometheus.GaugeVec
|
targetScrapePoolTargetLimit *prometheus.GaugeVec
|
||||||
targetScrapePoolTargetsAdded *prometheus.GaugeVec
|
targetScrapePoolTargetsAdded *prometheus.GaugeVec
|
||||||
|
targetScrapePoolSymbolTableItems *prometheus.GaugeVec
|
||||||
targetSyncIntervalLength *prometheus.SummaryVec
|
targetSyncIntervalLength *prometheus.SummaryVec
|
||||||
targetSyncFailed *prometheus.CounterVec
|
targetSyncFailed *prometheus.CounterVec
|
||||||
|
|
||||||
|
@ -129,6 +130,13 @@ func newScrapeMetrics(reg prometheus.Registerer) (*scrapeMetrics, error) {
|
||||||
},
|
},
|
||||||
[]string{"scrape_job"},
|
[]string{"scrape_job"},
|
||||||
)
|
)
|
||||||
|
sm.targetScrapePoolSymbolTableItems = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "prometheus_target_scrape_pool_symboltable_items",
|
||||||
|
Help: "Current number of symbols in table for this scrape pool.",
|
||||||
|
},
|
||||||
|
[]string{"scrape_job"},
|
||||||
|
)
|
||||||
sm.targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
|
sm.targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "prometheus_target_scrape_pool_sync_total",
|
Name: "prometheus_target_scrape_pool_sync_total",
|
||||||
|
@ -234,6 +242,7 @@ func newScrapeMetrics(reg prometheus.Registerer) (*scrapeMetrics, error) {
|
||||||
sm.targetScrapePoolExceededTargetLimit,
|
sm.targetScrapePoolExceededTargetLimit,
|
||||||
sm.targetScrapePoolTargetLimit,
|
sm.targetScrapePoolTargetLimit,
|
||||||
sm.targetScrapePoolTargetsAdded,
|
sm.targetScrapePoolTargetsAdded,
|
||||||
|
sm.targetScrapePoolSymbolTableItems,
|
||||||
sm.targetSyncFailed,
|
sm.targetSyncFailed,
|
||||||
// Used by targetScraper.
|
// Used by targetScraper.
|
||||||
sm.targetScrapeExceededBodySizeLimit,
|
sm.targetScrapeExceededBodySizeLimit,
|
||||||
|
@ -274,6 +283,7 @@ func (sm *scrapeMetrics) Unregister() {
|
||||||
sm.reg.Unregister(sm.targetScrapePoolExceededTargetLimit)
|
sm.reg.Unregister(sm.targetScrapePoolExceededTargetLimit)
|
||||||
sm.reg.Unregister(sm.targetScrapePoolTargetLimit)
|
sm.reg.Unregister(sm.targetScrapePoolTargetLimit)
|
||||||
sm.reg.Unregister(sm.targetScrapePoolTargetsAdded)
|
sm.reg.Unregister(sm.targetScrapePoolTargetsAdded)
|
||||||
|
sm.reg.Unregister(sm.targetScrapePoolSymbolTableItems)
|
||||||
sm.reg.Unregister(sm.targetSyncFailed)
|
sm.reg.Unregister(sm.targetSyncFailed)
|
||||||
sm.reg.Unregister(sm.targetScrapeExceededBodySizeLimit)
|
sm.reg.Unregister(sm.targetScrapeExceededBodySizeLimit)
|
||||||
sm.reg.Unregister(sm.targetScrapeCacheFlushForced)
|
sm.reg.Unregister(sm.targetScrapeCacheFlushForced)
|
||||||
|
|
|
@ -246,6 +246,7 @@ func (sp *scrapePool) stop() {
|
||||||
sp.metrics.targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName)
|
sp.metrics.targetScrapePoolSyncsCounter.DeleteLabelValues(sp.config.JobName)
|
||||||
sp.metrics.targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName)
|
sp.metrics.targetScrapePoolTargetLimit.DeleteLabelValues(sp.config.JobName)
|
||||||
sp.metrics.targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName)
|
sp.metrics.targetScrapePoolTargetsAdded.DeleteLabelValues(sp.config.JobName)
|
||||||
|
sp.metrics.targetScrapePoolSymbolTableItems.DeleteLabelValues(sp.config.JobName)
|
||||||
sp.metrics.targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName)
|
sp.metrics.targetSyncIntervalLength.DeleteLabelValues(sp.config.JobName)
|
||||||
sp.metrics.targetSyncFailed.DeleteLabelValues(sp.config.JobName)
|
sp.metrics.targetSyncFailed.DeleteLabelValues(sp.config.JobName)
|
||||||
}
|
}
|
||||||
|
@ -273,6 +274,15 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
|
|
||||||
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
|
||||||
|
|
||||||
|
sp.restartLoops(reuseCache)
|
||||||
|
oldClient.CloseIdleConnections()
|
||||||
|
sp.metrics.targetReloadIntervalLength.WithLabelValues(time.Duration(sp.config.ScrapeInterval).String()).Observe(
|
||||||
|
time.Since(start).Seconds(),
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sp *scrapePool) restartLoops(reuseCache bool) {
|
||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
interval = time.Duration(sp.config.ScrapeInterval)
|
interval = time.Duration(sp.config.ScrapeInterval)
|
||||||
|
@ -313,7 +323,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
client: sp.client,
|
client: sp.client,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
bodySizeLimit: bodySizeLimit,
|
bodySizeLimit: bodySizeLimit,
|
||||||
acceptHeader: acceptHeader(cfg.ScrapeProtocols),
|
acceptHeader: acceptHeader(sp.config.ScrapeProtocols),
|
||||||
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
|
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
|
||||||
}
|
}
|
||||||
newLoop = sp.newLoop(scrapeLoopOptions{
|
newLoop = sp.newLoop(scrapeLoopOptions{
|
||||||
|
@ -352,11 +362,10 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
sp.targetMtx.Unlock()
|
sp.targetMtx.Unlock()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
oldClient.CloseIdleConnections()
|
}
|
||||||
sp.metrics.targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
||||||
time.Since(start).Seconds(),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
// Must be called with sp.mtx held.
|
||||||
|
func (sp *scrapePool) checkSymbolTable() {
|
||||||
// Here we take steps to clear out the symbol table if it has grown a lot.
|
// Here we take steps to clear out the symbol table if it has grown a lot.
|
||||||
// After waiting some time for things to settle, we take the size of the symbol-table.
|
// After waiting some time for things to settle, we take the size of the symbol-table.
|
||||||
// If, after some more time, the table has grown to twice that size, we start a new one.
|
// If, after some more time, the table has grown to twice that size, we start a new one.
|
||||||
|
@ -367,11 +376,10 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
} else if sp.symbolTable.Len() > 2*sp.initialSymbolTableLen {
|
} else if sp.symbolTable.Len() > 2*sp.initialSymbolTableLen {
|
||||||
sp.symbolTable = labels.NewSymbolTable()
|
sp.symbolTable = labels.NewSymbolTable()
|
||||||
sp.initialSymbolTableLen = 0
|
sp.initialSymbolTableLen = 0
|
||||||
|
sp.restartLoops(false) // To drop all caches.
|
||||||
}
|
}
|
||||||
sp.lastSymbolTableCheck = time.Now()
|
sp.lastSymbolTableCheck = time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync converts target groups into actual scrape targets and synchronizes
|
// Sync converts target groups into actual scrape targets and synchronizes
|
||||||
|
@ -408,8 +416,10 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sp.metrics.targetScrapePoolSymbolTableItems.WithLabelValues(sp.config.JobName).Set(float64(sp.symbolTable.Len()))
|
||||||
sp.targetMtx.Unlock()
|
sp.targetMtx.Unlock()
|
||||||
sp.sync(all)
|
sp.sync(all)
|
||||||
|
sp.checkSymbolTable()
|
||||||
|
|
||||||
sp.metrics.targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
sp.metrics.targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
||||||
time.Since(start).Seconds(),
|
time.Since(start).Seconds(),
|
||||||
|
|
|
@ -1407,6 +1407,9 @@ func (db *DB) compactHead(head *RangeHead) error {
|
||||||
if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
|
if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
|
||||||
return fmt.Errorf("head memory truncate: %w", err)
|
return fmt.Errorf("head memory truncate: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db.head.RebuildSymbolTable(db.logger)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/go-kit/log"
|
||||||
|
"github.com/go-kit/log/level"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -25,3 +28,68 @@ func (s *memSeries) labels() labels.Labels {
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
return s.lset
|
return s.lset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RebuildSymbolTable goes through all the series in h, build a SymbolTable with all names and values,
|
||||||
|
// replace each series' Labels with one using that SymbolTable.
|
||||||
|
func (h *Head) RebuildSymbolTable(logger log.Logger) *labels.SymbolTable {
|
||||||
|
level.Info(logger).Log("msg", "RebuildSymbolTable starting")
|
||||||
|
st := labels.NewSymbolTable()
|
||||||
|
builder := labels.NewScratchBuilderWithSymbolTable(st, 0)
|
||||||
|
rebuildLabels := func(lbls labels.Labels) labels.Labels {
|
||||||
|
builder.Reset()
|
||||||
|
lbls.Range(func(l labels.Label) {
|
||||||
|
builder.Add(l.Name, l.Value)
|
||||||
|
})
|
||||||
|
return builder.Labels()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < h.series.size; i++ {
|
||||||
|
h.series.locks[i].Lock()
|
||||||
|
|
||||||
|
for _, s := range h.series.hashes[i].unique {
|
||||||
|
s.Lock()
|
||||||
|
s.lset = rebuildLabels(s.lset)
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, all := range h.series.hashes[i].conflicts {
|
||||||
|
for _, s := range all {
|
||||||
|
s.Lock()
|
||||||
|
s.lset = rebuildLabels(s.lset)
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
h.series.locks[i].Unlock()
|
||||||
|
}
|
||||||
|
type withReset interface{ ResetSymbolTable(*labels.SymbolTable) }
|
||||||
|
if e, ok := h.exemplars.(withReset); ok {
|
||||||
|
e.ResetSymbolTable(st)
|
||||||
|
}
|
||||||
|
level.Info(logger).Log("msg", "RebuildSymbolTable finished", "size", st.Len())
|
||||||
|
return st
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ce *CircularExemplarStorage) ResetSymbolTable(st *labels.SymbolTable) {
|
||||||
|
builder := labels.NewScratchBuilderWithSymbolTable(st, 0)
|
||||||
|
rebuildLabels := func(lbls labels.Labels) labels.Labels {
|
||||||
|
builder.Reset()
|
||||||
|
lbls.Range(func(l labels.Label) {
|
||||||
|
builder.Add(l.Name, l.Value)
|
||||||
|
})
|
||||||
|
return builder.Labels()
|
||||||
|
}
|
||||||
|
|
||||||
|
ce.lock.RLock()
|
||||||
|
defer ce.lock.RUnlock()
|
||||||
|
|
||||||
|
for _, v := range ce.index {
|
||||||
|
v.seriesLabels = rebuildLabels(v.seriesLabels)
|
||||||
|
}
|
||||||
|
for i := range ce.exemplars {
|
||||||
|
if ce.exemplars[i].ref == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ce.exemplars[i].exemplar.Labels = rebuildLabels(ce.exemplars[i].exemplar.Labels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/go-kit/log"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,3 +25,8 @@ import (
|
||||||
func (s *memSeries) labels() labels.Labels {
|
func (s *memSeries) labels() labels.Labels {
|
||||||
return s.lset
|
return s.lset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No-op when not using dedupelabels.
|
||||||
|
func (h *Head) RebuildSymbolTable(logger log.Logger) *labels.SymbolTable {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue