Add stalemarkers to synthetic series too when target stops.

This commit is contained in:
Brian Brazil 2017-05-11 14:43:43 +01:00
parent b87d3ca9ea
commit d532272520
3 changed files with 61 additions and 8 deletions

View file

@ -38,7 +38,7 @@ type collectResultAppender struct {
func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error { func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error {
// Not implemented. // Not implemented.
return nil return storage.ErrNotFound
} }
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) { func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {

View file

@ -566,6 +566,9 @@ mainLoop:
if _, _, err := sl.append([]byte{}, staleTime); err != nil { if _, _, err := sl.append([]byte{}, staleTime); err != nil {
log.With("err", err).Error("stale append failed") log.With("err", err).Error("stale append failed")
} }
if err := sl.reportStale(staleTime); err != nil {
log.With("err", err).Error("stale report failed")
}
} }
// Stop the scraping. May still write data and stale markers after it has // Stop the scraping. May still write data and stale markers after it has
@ -738,13 +741,45 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
return app.Commit() return app.Commit()
} }
func (sl *scrapeLoop) reportStale(start time.Time) error {
ts := timestamp.FromTime(start)
app := sl.reportAppender()
stale := math.Float64frombits(value.StaleNaN)
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
app.Rollback()
return err
}
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil {
app.Rollback()
return err
}
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil {
app.Rollback()
return err
}
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil {
app.Rollback()
return err
}
return app.Commit()
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
ref, ok := sl.refCache[s] ref, ok := sl.refCache[s]
if ok { if ok {
if err := app.AddFast(ref, t, v); err == nil { err := app.AddFast(ref, t, v)
switch err {
case nil:
return nil return nil
} else if err != storage.ErrNotFound { case storage.ErrNotFound:
// Try an Add.
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not log here, as this is expected if a target goes away and comes back
// again with a new scrape loop.
return nil
default:
return err return err
} }
} }
@ -752,10 +787,13 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
labels.Label{Name: labels.MetricName, Value: s}, labels.Label{Name: labels.MetricName, Value: s},
} }
ref, err := app.Add(met, t, v) ref, err := app.Add(met, t, v)
if err != nil { switch err {
case nil:
sl.refCache[s] = ref
return nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
return nil
default:
return err return err
} }
sl.refCache[s] = ref
return nil
} }

View file

@ -357,12 +357,13 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
func TestScrapeLoopStop(t *testing.T) { func TestScrapeLoopStop(t *testing.T) {
appender := &collectResultAppender{} appender := &collectResultAppender{}
reportAppender := &collectResultAppender{}
var ( var (
signal = make(chan struct{}) signal = make(chan struct{})
scraper = &testScraper{} scraper = &testScraper{}
app = func() storage.Appender { return appender } app = func() storage.Appender { return appender }
reportApp = func() storage.Appender { return &nopAppender{} } reportApp = func() storage.Appender { return reportAppender }
numScrapes = 0 numScrapes = 0
) )
defer close(signal) defer close(signal)
@ -398,6 +399,20 @@ func TestScrapeLoopStop(t *testing.T) {
if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v)) t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v))
} }
if len(reportAppender.result) < 8 {
t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 8, len(reportAppender.result))
}
if len(reportAppender.result)%4 != 0 {
t.Fatalf("Appended samples not as expected. Wanted: samples mod 4 == 0 Got: %d samples", len(reportAppender.result))
}
if !value.IsStaleNaN(reportAppender.result[len(reportAppender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(reportAppender.result[len(reportAppender.result)].v))
}
if reportAppender.result[len(reportAppender.result)-1].t != appender.result[len(appender.result)-1].t {
t.Fatalf("Expected last append and report sample to have same timestamp. Append: stale NaN Report: %x", appender.result[len(appender.result)-1].t, reportAppender.result[len(reportAppender.result)-1].t)
}
} }
func TestScrapeLoopRun(t *testing.T) { func TestScrapeLoopRun(t *testing.T) {