From 4f35952cf3fa6160da47161cbc508376b347ae2f Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 13 Apr 2017 18:07:23 +0100 Subject: [PATCH] Inject a stale NaN when sample disappears between scrapes. --- retrieval/scrape.go | 49 ++++++++++++++++++++++++++++++++-------- retrieval/scrape_test.go | 5 ++-- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 8258892b8..4cdcf5a28 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -19,6 +19,7 @@ import ( "compress/gzip" "fmt" "io" + "math" "net/http" "sync" "time" @@ -35,6 +36,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/httputil" ) @@ -410,13 +412,20 @@ type loop interface { stop() } +type lsetCacheEntry struct { + lset labels.Labels + str string +} + type scrapeLoop struct { scraper scraper appender func() storage.Appender reportAppender func() storage.Appender - cache map[string]uint64 + refCache map[string]uint64 // Parsed string to ref. + lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string + samplesInPreviousScrape map[string]labels.Labels done chan struct{} ctx context.Context @@ -428,7 +437,8 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag scraper: sc, appender: app, reportAppender: reportApp, - cache: map[string]uint64{}, + refCache: map[string]uint64{}, + lsetCache: map[uint64]lsetCacheEntry{}, done: make(chan struct{}), } sl.ctx, sl.cancel = context.WithCancel(ctx) @@ -525,9 +535,10 @@ func (s samples) Less(i, j int) bool { func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err error) { var ( - app = sl.appender() - p = textparse.New(b) - defTime = timestamp.FromTime(ts) + app = sl.appender() + p = textparse.New(b) + defTime = timestamp.FromTime(ts) + samplesScraped = map[string]labels.Labels{} ) loop: @@ -541,10 +552,11 @@ loop: } mets := yoloString(met) - ref, ok := sl.cache[mets] + ref, ok := sl.refCache[mets] if ok { switch err = app.AddFast(ref, t, v); err { case nil: + samplesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset case storage.ErrNotFound: ok = false case errSeriesDropped: @@ -568,13 +580,31 @@ loop: } // Allocate a real string. mets = string(met) - sl.cache[mets] = ref + sl.refCache[mets] = ref + str := lset.String() + sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str} + samplesScraped[str] = lset } added++ } if err == nil { err = p.Err() } + if err == nil { + for metric, lset := range sl.samplesInPreviousScrape { + if _, ok := samplesScraped[metric]; !ok { + // Sample no longer exposed, mark it stale. + _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) + switch err { + case nil: + case errSeriesDropped: + continue + default: + break + } + } + } + } if err != nil { app.Rollback() return total, 0, err @@ -582,6 +612,7 @@ loop: if err := app.Commit(); err != nil { return total, 0, err } + sl.samplesInPreviousScrape = samplesScraped return total, added, nil } @@ -621,7 +652,7 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { - ref, ok := sl.cache[s] + ref, ok := sl.refCache[s] if ok { if err := app.AddFast(ref, t, v); err == nil { @@ -637,7 +668,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v if err != nil { return err } - sl.cache[s] = ref + sl.refCache[s] = ref return nil } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 23097215b..7ebac1e3e 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -18,10 +18,10 @@ import ( "fmt" "io" "io/ioutil" + "math" "net/http" "net/http/httptest" "net/url" - "math" "reflect" "strings" "sync" @@ -442,7 +442,8 @@ func TestScrapeLoopAppend(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - cache: map[string]uint64{}, + refCache: map[string]uint64{}, + lsetCache: map[uint64]lsetCacheEntry{}, } now := time.Now()