From 437f51a85f0199501b6dea8fbac1a9b573958721 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 15 Sep 2017 11:08:51 +0200 Subject: [PATCH] Fix cache maintenance on changing metric representations We were not properly maintaining the scrape cache when the same metric was exposed with a different string representation. This overall reduces the scraping cache's complexity, which fixes the issue and saves about 10% of memory in a scraping-only Prometheus instance. --- retrieval/helpers_test.go | 23 +++++++- retrieval/scrape.go | 117 +++++++++++++++----------------------- retrieval/scrape_test.go | 49 ++++++++++++++++ 3 files changed, 115 insertions(+), 74 deletions(-) diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index c8ba8be34..85e63cc4e 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -32,13 +32,27 @@ func (a nopAppender) AddFast(labels.Labels, uint64, int64, float64) error { retu func (a nopAppender) Commit() error { return nil } func (a nopAppender) Rollback() error { return nil } +// collectResultAppender records all samples that were added through the appender. +// It can be used as its zero value or be backed by another appender it writes samples through. type collectResultAppender struct { + next storage.Appender result []sample } func (a *collectResultAppender) AddFast(m labels.Labels, ref uint64, t int64, v float64) error { - // Not implemented. - return storage.ErrNotFound + if a.next == nil { + return storage.ErrNotFound + } + err := a.next.AddFast(m, ref, t, v) + if err != nil { + return err + } + a.result = append(a.result, sample{ + metric: m, + t: t, + v: v, + }) + return err } func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) { @@ -47,7 +61,10 @@ func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64 t: t, v: v, }) - return 0, nil + if a.next == nil { + return 0, nil + } + return a.next.Add(m, t, v) } func (a *collectResultAppender) Commit() error { return nil } diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 947701da9..3ccfffa5f 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -45,13 +45,6 @@ import ( "github.com/prometheus/prometheus/util/httputil" ) -const ( - scrapeHealthMetricName = "up" - scrapeDurationMetricName = "scrape_duration_seconds" - scrapeSamplesMetricName = "scrape_samples_scraped" - samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" -) - var ( targetIntervalLength = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -460,15 +453,11 @@ type loop interface { stop() } -type lsetCacheEntry struct { - metric string - lset labels.Labels - hash uint64 -} - -type refEntry struct { +type cacheEntry struct { ref uint64 lastIter uint64 + hash uint64 + lset labels.Labels } type scrapeLoop struct { @@ -494,8 +483,9 @@ type scrapeLoop struct { type scrapeCache struct { iter uint64 // Current scrape iteration. - refs map[string]*refEntry // Parsed string to ref. - lsets map[uint64]*lsetCacheEntry // Ref to labelset and string. + // Parsed string to an entry with information about the actual label set + // and its storage reference. + entries map[string]*cacheEntry // Cache of dropped metric strings and their iteration. The iteration must // be a pointer so we can update it without setting a new entry with an unsafe @@ -511,8 +501,7 @@ type scrapeCache struct { func newScrapeCache() *scrapeCache { return &scrapeCache{ - refs: map[string]*refEntry{}, - lsets: map[uint64]*lsetCacheEntry{}, + entries: map[string]*cacheEntry{}, dropped: map[string]*uint64{}, seriesCur: map[uint64]labels.Labels{}, seriesPrev: map[uint64]labels.Labels{}, @@ -523,14 +512,13 @@ func (c *scrapeCache) iterDone() { // refCache and lsetCache may grow over time through series churn // or multiple string representations of the same metric. Clean up entries // that haven't appeared in the last scrape. - for s, e := range c.refs { - if e.lastIter < c.iter { - delete(c.refs, s) - delete(c.lsets, e.ref) + for s, e := range c.entries { + if c.iter-e.lastIter > 2 { + delete(c.entries, s) } } for s, iter := range c.dropped { - if *iter < c.iter { + if c.iter-*iter > 2 { delete(c.dropped, s) } } @@ -546,29 +534,20 @@ func (c *scrapeCache) iterDone() { c.iter++ } -func (c *scrapeCache) getRef(met string) (uint64, bool) { - e, ok := c.refs[met] +func (c *scrapeCache) get(met string) (*cacheEntry, bool) { + e, ok := c.entries[met] if !ok { - return 0, false + return nil, false } e.lastIter = c.iter - return e.ref, true + return e, true } func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) { if ref == 0 { return } - // Clean up the label set cache before overwriting the ref for a previously seen - // metric representation. It won't be caught by the cleanup in iterDone otherwise. - if e, ok := c.refs[met]; ok { - delete(c.lsets, e.ref) - } - c.refs[met] = &refEntry{ref: ref, lastIter: c.iter} - // met is the raw string the metric was ingested as. The label set is not ordered - // and thus it's not suitable to uniquely identify cache entries. - // We store a hash over the label set instead. - c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash} + c.entries[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} } func (c *scrapeCache) addDropped(met string) { @@ -825,14 +804,12 @@ loop: if sl.cache.getDropped(yoloString(met)) { continue } - ref, ok := sl.cache.getRef(yoloString(met)) + ce, ok := sl.cache.get(yoloString(met)) if ok { - lset := sl.cache.lsets[ref].lset - switch err = app.AddFast(lset, ref, t, v); err { + switch err = app.AddFast(ce.lset, ce.ref, t, v); err { case nil: if tp == nil { - e := sl.cache.lsets[ref] - sl.cache.trackStaleness(e.hash, e.lset) + sl.cache.trackStaleness(ce.hash, ce.lset) } case storage.ErrNotFound: ok = false @@ -862,28 +839,19 @@ loop: } } if !ok { - var ( - lset labels.Labels - mets string - hash uint64 - ) - if e, ok := sl.cache.lsets[ref]; ok { - mets = e.metric - lset = e.lset - hash = e.hash - } else { - mets = p.Metric(&lset) - hash = lset.Hash() + var lset labels.Labels - // Hash label set as it is seen local to the target. Then add target labels - // and relabeling and store the final label set. - lset = sl.sampleMutator(lset) + mets := p.Metric(&lset) + hash := lset.Hash() - // The label set may be set to nil to indicate dropping. - if lset == nil { - sl.cache.addDropped(mets) - continue - } + // Hash label set as it is seen local to the target. Then add target labels + // and relabeling and store the final label set. + lset = sl.sampleMutator(lset) + + // The label set may be set to nil to indicate dropping. + if lset == nil { + sl.cache.addDropped(mets) + continue } var ref uint64 @@ -970,6 +938,15 @@ func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } +// The constants are suffixed with the invalid \xff unicode rune to avoid collisions +// with scraped metrics in the cache. +const ( + scrapeHealthMetricName = "up" + "\xff" + scrapeDurationMetricName = "scrape_duration_seconds" + "\xff" + scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff" + samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff" +) + func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error { sl.scraper.report(start, duration, err) @@ -1026,14 +1003,9 @@ func (sl *scrapeLoop) reportStale(start time.Time) error { } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { - // Suffix s with the invalid \xff unicode rune to avoid collisions - // with scraped metrics. - s2 := s + "\xff" - - ref, ok := sl.cache.getRef(s2) + ce, ok := sl.cache.get(s) if ok { - lset := sl.cache.lsets[ref].lset - err := app.AddFast(lset, ref, t, v) + err := app.AddFast(ce.lset, ce.ref, t, v) switch err { case nil: return nil @@ -1048,7 +1020,10 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v } } lset := labels.Labels{ - labels.Label{Name: labels.MetricName, Value: s}, + // The constants are suffixed with the invalid \xff unicode rune to avoid collisions + // with scraped metrics in the cache. + // We have to drop it when building the actual metric. + labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]}, } hash := lset.Hash() @@ -1057,7 +1032,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, err := app.Add(lset, t, v) switch err { case nil: - sl.cache.addRef(s2, ref, lset, hash) + sl.cache.addRef(s, ref, lset, hash) return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 12e366ed3..a9ca87cd2 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/testutil" ) func TestNewScrapePool(t *testing.T) { @@ -625,6 +626,54 @@ func TestScrapeLoopAppend(t *testing.T) { } } +func TestScrapeLoop_ChangingMetricString(t *testing.T) { + // This is a regression test for the scrape loop cache not properly maintaining + // IDs when the string representation of a metric changes across a scrape. Thus + // we use a real storage appender here. + s := testutil.NewStorage(t) + defer s.Close() + + app, err := s.Appender() + if err != nil { + t.Error(err) + } + capp := &collectResultAppender{next: app} + + sl := newScrapeLoop(context.Background(), + nil, nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return capp }, + ) + + now := time.Now() + _, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1`), now) + if err != nil { + t.Fatalf("Unexpected append error: %s", err) + } + _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), now.Add(time.Minute)) + if err != nil { + t.Fatalf("Unexpected append error: %s", err) + } + + // DeepEqual will report NaNs as being different, so replace with a different value. + want := []sample{ + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(now), + v: 1, + }, + { + metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"), + t: timestamp.FromTime(now.Add(time.Minute)), + v: 2, + }, + } + if !reflect.DeepEqual(want, capp.result) { + t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, capp.result) + } +} + func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{}