diff --git a/scrape/scrape.go b/scrape/scrape.go index 2039a7b82..979fa4cf6 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -125,6 +125,12 @@ var ( Help: "Total number of samples rejected due to timestamp falling outside of the time bounds", }, ) + targetScrapeCacheFlushForced = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "prometheus_target_scrapes_cache_flush_forced_total", + Help: "How many times a scrape cache was flushed due to getting big while scrapes are failing.", + }, + ) ) func init() { @@ -140,6 +146,7 @@ func init() { prometheus.MustRegister(targetScrapeSampleDuplicate) prometheus.MustRegister(targetScrapeSampleOutOfOrder) prometheus.MustRegister(targetScrapeSampleOutOfBounds) + prometheus.MustRegister(targetScrapeCacheFlushForced) } // scrapePool manages scrapes for sets of targets. @@ -606,6 +613,9 @@ type scrapeLoop struct { type scrapeCache struct { iter uint64 // Current scrape iteration. + // How many series and metadata entries there were at the last success. + successfulCount int + // Parsed string to an entry with information about the actual label set // and its storage reference. series map[string]*cacheEntry @@ -643,8 +653,24 @@ func newScrapeCache() *scrapeCache { } } -func (c *scrapeCache) iterDone(cleanCache bool) { - if cleanCache { +func (c *scrapeCache) iterDone(flushCache bool) { + c.metaMtx.Lock() + count := len(c.series) + len(c.droppedSeries) + len(c.metadata) + c.metaMtx.Unlock() + + if flushCache { + c.successfulCount = count + } else if count > c.successfulCount*2+1000 { + // If a target had varying labels in scrapes that ultimately failed, + // the caches would grow indefinitely. Force a flush when this happens. + // We use the heuristic that this is a doubling of the cache size + // since the last scrape, and allow an additional 1000 in case + // initial scrapes all fail. + flushCache = true + targetScrapeCacheFlushForced.Inc() + } + + if flushCache { // All caches may grow over time through series churn // or multiple string representations of the same metric. Clean up entries // that haven't appeared in the last scrape. @@ -1185,6 +1211,8 @@ loop: return total, added, err } + // Only perform cache cleaning if the scrape was not empty. + // An empty scrape (usually) is used to indicate a failed scrape. sl.cache.iterDone(len(b) > 0) return total, added, nil diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 64a211996..bb5f51c4b 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -859,6 +859,66 @@ func TestScrapeLoopCache(t *testing.T) { } } +func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { + s := testutil.NewStorage(t) + defer s.Close() + + sapp, err := s.Appender() + if err != nil { + t.Error(err) + } + appender := &collectResultAppender{next: sapp} + var ( + signal = make(chan struct{}) + scraper = &testScraper{} + app = func() storage.Appender { return appender } + ) + defer close(signal) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + nil, + 0, + true, + ) + + numScrapes := 0 + + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes++ + if numScrapes < 5 { + s := "" + for i := 0; i < 500; i++ { + s = fmt.Sprintf("%smetric_%d_%d 42\n", s, i, numScrapes) + } + w.Write([]byte(fmt.Sprintf(s + "&"))) + } else { + cancel() + } + return nil + } + + go func() { + sl.run(10*time.Millisecond, time.Hour, nil) + signal <- struct{}{} + }() + + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape wasn't stopped.") + } + + if len(sl.cache.series) > 2000 { + t.Fatalf("More than 2000 series cached. Got: %d", len(sl.cache.series)) + } +} + func TestScrapeLoopAppend(t *testing.T) { tests := []struct {