mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Fix cache maintenance on changing metric representations
We were not properly maintaining the scrape cache when the same metric was exposed with a different string representation. This overall reduces the scraping cache's complexity, which fixes the issue and saves about 10% of memory in a scraping-only Prometheus instance.
This commit is contained in:
parent
a04be0bc1c
commit
437f51a85f
|
@ -32,13 +32,27 @@ func (a nopAppender) AddFast(labels.Labels, uint64, int64, float64) error { retu
|
|||
func (a nopAppender) Commit() error { return nil }
|
||||
func (a nopAppender) Rollback() error { return nil }
|
||||
|
||||
// collectResultAppender records all samples that were added through the appender.
|
||||
// It can be used as its zero value or be backed by another appender it writes samples through.
|
||||
type collectResultAppender struct {
|
||||
next storage.Appender
|
||||
result []sample
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AddFast(m labels.Labels, ref uint64, t int64, v float64) error {
|
||||
// Not implemented.
|
||||
if a.next == nil {
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
err := a.next.AddFast(m, ref, t, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.result = append(a.result, sample{
|
||||
metric: m,
|
||||
t: t,
|
||||
v: v,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {
|
||||
|
@ -47,7 +61,10 @@ func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64
|
|||
t: t,
|
||||
v: v,
|
||||
})
|
||||
if a.next == nil {
|
||||
return 0, nil
|
||||
}
|
||||
return a.next.Add(m, t, v)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Commit() error { return nil }
|
||||
|
|
|
@ -45,13 +45,6 @@ import (
|
|||
"github.com/prometheus/prometheus/util/httputil"
|
||||
)
|
||||
|
||||
const (
|
||||
scrapeHealthMetricName = "up"
|
||||
scrapeDurationMetricName = "scrape_duration_seconds"
|
||||
scrapeSamplesMetricName = "scrape_samples_scraped"
|
||||
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling"
|
||||
)
|
||||
|
||||
var (
|
||||
targetIntervalLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
|
@ -460,15 +453,11 @@ type loop interface {
|
|||
stop()
|
||||
}
|
||||
|
||||
type lsetCacheEntry struct {
|
||||
metric string
|
||||
lset labels.Labels
|
||||
hash uint64
|
||||
}
|
||||
|
||||
type refEntry struct {
|
||||
type cacheEntry struct {
|
||||
ref uint64
|
||||
lastIter uint64
|
||||
hash uint64
|
||||
lset labels.Labels
|
||||
}
|
||||
|
||||
type scrapeLoop struct {
|
||||
|
@ -494,8 +483,9 @@ type scrapeLoop struct {
|
|||
type scrapeCache struct {
|
||||
iter uint64 // Current scrape iteration.
|
||||
|
||||
refs map[string]*refEntry // Parsed string to ref.
|
||||
lsets map[uint64]*lsetCacheEntry // Ref to labelset and string.
|
||||
// Parsed string to an entry with information about the actual label set
|
||||
// and its storage reference.
|
||||
entries map[string]*cacheEntry
|
||||
|
||||
// Cache of dropped metric strings and their iteration. The iteration must
|
||||
// be a pointer so we can update it without setting a new entry with an unsafe
|
||||
|
@ -511,8 +501,7 @@ type scrapeCache struct {
|
|||
|
||||
func newScrapeCache() *scrapeCache {
|
||||
return &scrapeCache{
|
||||
refs: map[string]*refEntry{},
|
||||
lsets: map[uint64]*lsetCacheEntry{},
|
||||
entries: map[string]*cacheEntry{},
|
||||
dropped: map[string]*uint64{},
|
||||
seriesCur: map[uint64]labels.Labels{},
|
||||
seriesPrev: map[uint64]labels.Labels{},
|
||||
|
@ -523,14 +512,13 @@ func (c *scrapeCache) iterDone() {
|
|||
// refCache and lsetCache may grow over time through series churn
|
||||
// or multiple string representations of the same metric. Clean up entries
|
||||
// that haven't appeared in the last scrape.
|
||||
for s, e := range c.refs {
|
||||
if e.lastIter < c.iter {
|
||||
delete(c.refs, s)
|
||||
delete(c.lsets, e.ref)
|
||||
for s, e := range c.entries {
|
||||
if c.iter-e.lastIter > 2 {
|
||||
delete(c.entries, s)
|
||||
}
|
||||
}
|
||||
for s, iter := range c.dropped {
|
||||
if *iter < c.iter {
|
||||
if c.iter-*iter > 2 {
|
||||
delete(c.dropped, s)
|
||||
}
|
||||
}
|
||||
|
@ -546,29 +534,20 @@ func (c *scrapeCache) iterDone() {
|
|||
c.iter++
|
||||
}
|
||||
|
||||
func (c *scrapeCache) getRef(met string) (uint64, bool) {
|
||||
e, ok := c.refs[met]
|
||||
func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
|
||||
e, ok := c.entries[met]
|
||||
if !ok {
|
||||
return 0, false
|
||||
return nil, false
|
||||
}
|
||||
e.lastIter = c.iter
|
||||
return e.ref, true
|
||||
return e, true
|
||||
}
|
||||
|
||||
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
|
||||
if ref == 0 {
|
||||
return
|
||||
}
|
||||
// Clean up the label set cache before overwriting the ref for a previously seen
|
||||
// metric representation. It won't be caught by the cleanup in iterDone otherwise.
|
||||
if e, ok := c.refs[met]; ok {
|
||||
delete(c.lsets, e.ref)
|
||||
}
|
||||
c.refs[met] = &refEntry{ref: ref, lastIter: c.iter}
|
||||
// met is the raw string the metric was ingested as. The label set is not ordered
|
||||
// and thus it's not suitable to uniquely identify cache entries.
|
||||
// We store a hash over the label set instead.
|
||||
c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash}
|
||||
c.entries[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
|
||||
}
|
||||
|
||||
func (c *scrapeCache) addDropped(met string) {
|
||||
|
@ -825,14 +804,12 @@ loop:
|
|||
if sl.cache.getDropped(yoloString(met)) {
|
||||
continue
|
||||
}
|
||||
ref, ok := sl.cache.getRef(yoloString(met))
|
||||
ce, ok := sl.cache.get(yoloString(met))
|
||||
if ok {
|
||||
lset := sl.cache.lsets[ref].lset
|
||||
switch err = app.AddFast(lset, ref, t, v); err {
|
||||
switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
|
||||
case nil:
|
||||
if tp == nil {
|
||||
e := sl.cache.lsets[ref]
|
||||
sl.cache.trackStaleness(e.hash, e.lset)
|
||||
sl.cache.trackStaleness(ce.hash, ce.lset)
|
||||
}
|
||||
case storage.ErrNotFound:
|
||||
ok = false
|
||||
|
@ -862,18 +839,10 @@ loop:
|
|||
}
|
||||
}
|
||||
if !ok {
|
||||
var (
|
||||
lset labels.Labels
|
||||
mets string
|
||||
hash uint64
|
||||
)
|
||||
if e, ok := sl.cache.lsets[ref]; ok {
|
||||
mets = e.metric
|
||||
lset = e.lset
|
||||
hash = e.hash
|
||||
} else {
|
||||
mets = p.Metric(&lset)
|
||||
hash = lset.Hash()
|
||||
var lset labels.Labels
|
||||
|
||||
mets := p.Metric(&lset)
|
||||
hash := lset.Hash()
|
||||
|
||||
// Hash label set as it is seen local to the target. Then add target labels
|
||||
// and relabeling and store the final label set.
|
||||
|
@ -884,7 +853,6 @@ loop:
|
|||
sl.cache.addDropped(mets)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
var ref uint64
|
||||
ref, err = app.Add(lset, t, v)
|
||||
|
@ -970,6 +938,15 @@ func yoloString(b []byte) string {
|
|||
return *((*string)(unsafe.Pointer(&b)))
|
||||
}
|
||||
|
||||
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
||||
// with scraped metrics in the cache.
|
||||
const (
|
||||
scrapeHealthMetricName = "up" + "\xff"
|
||||
scrapeDurationMetricName = "scrape_duration_seconds" + "\xff"
|
||||
scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff"
|
||||
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
|
||||
)
|
||||
|
||||
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
|
||||
sl.scraper.report(start, duration, err)
|
||||
|
||||
|
@ -1026,14 +1003,9 @@ func (sl *scrapeLoop) reportStale(start time.Time) error {
|
|||
}
|
||||
|
||||
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
|
||||
// Suffix s with the invalid \xff unicode rune to avoid collisions
|
||||
// with scraped metrics.
|
||||
s2 := s + "\xff"
|
||||
|
||||
ref, ok := sl.cache.getRef(s2)
|
||||
ce, ok := sl.cache.get(s)
|
||||
if ok {
|
||||
lset := sl.cache.lsets[ref].lset
|
||||
err := app.AddFast(lset, ref, t, v)
|
||||
err := app.AddFast(ce.lset, ce.ref, t, v)
|
||||
switch err {
|
||||
case nil:
|
||||
return nil
|
||||
|
@ -1048,7 +1020,10 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
|
|||
}
|
||||
}
|
||||
lset := labels.Labels{
|
||||
labels.Label{Name: labels.MetricName, Value: s},
|
||||
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
||||
// with scraped metrics in the cache.
|
||||
// We have to drop it when building the actual metric.
|
||||
labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
|
||||
}
|
||||
|
||||
hash := lset.Hash()
|
||||
|
@ -1057,7 +1032,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
|
|||
ref, err := app.Add(lset, t, v)
|
||||
switch err {
|
||||
case nil:
|
||||
sl.cache.addRef(s2, ref, lset, hash)
|
||||
sl.cache.addRef(s, ref, lset, hash)
|
||||
return nil
|
||||
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
||||
return nil
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/pkg/value"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestNewScrapePool(t *testing.T) {
|
||||
|
@ -625,6 +626,54 @@ func TestScrapeLoopAppend(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoop_ChangingMetricString(t *testing.T) {
|
||||
// This is a regression test for the scrape loop cache not properly maintaining
|
||||
// IDs when the string representation of a metric changes across a scrape. Thus
|
||||
// we use a real storage appender here.
|
||||
s := testutil.NewStorage(t)
|
||||
defer s.Close()
|
||||
|
||||
app, err := s.Appender()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
capp := &collectResultAppender{next: app}
|
||||
|
||||
sl := newScrapeLoop(context.Background(),
|
||||
nil, nil, nil,
|
||||
nopMutator,
|
||||
nopMutator,
|
||||
func() storage.Appender { return capp },
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
_, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), now)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected append error: %s", err)
|
||||
}
|
||||
_, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), now.Add(time.Minute))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected append error: %s", err)
|
||||
}
|
||||
|
||||
// DeepEqual will report NaNs as being different, so replace with a different value.
|
||||
want := []sample{
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
||||
t: timestamp.FromTime(now),
|
||||
v: 1,
|
||||
},
|
||||
{
|
||||
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
|
||||
t: timestamp.FromTime(now.Add(time.Minute)),
|
||||
v: 2,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(want, capp.result) {
|
||||
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, capp.result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScrapeLoopAppendStaleness(t *testing.T) {
|
||||
app := &collectResultAppender{}
|
||||
|
||||
|
|
Loading…
Reference in a new issue