Move crucial actions to defer (#6918)

With defer having less of a performance penalty, there is no reason
not to do those crucial operations via defer.

Context: With isolation in place, if we forget to Commit/Rollback, the
low watermark will get stuck forever.

The current code should not have any bugs, but moving to defer helps
to avoid future bugs.

This is also moving the `closeAppend` in the `Commit` implementation
itself to defer. If logging to the WAL fails, we would have missed the
`closeAppend`.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
Björn Rabenstein 2020-03-13 20:54:47 +01:00 committed by GitHub
parent bf0055536d
commit d80b0810c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 57 deletions

View file

@ -590,6 +590,13 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
app := g.opts.Appendable.Appender() app := g.opts.Appendable.Appender()
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()
for _, s := range vector { for _, s := range vector {
if _, err := app.Add(s.Metric, s.T, s.V); err != nil { if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
switch err { switch err {
@ -627,11 +634,6 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
} }
} }
} }
if err := app.Commit(); err != nil {
level.Warn(g.logger).Log("msg", "rule sample appending failed", "err", err)
} else {
g.seriesInPreviousEval[i] = seriesReturned
}
}(i, rule) }(i, rule)
} }
g.cleanupStaleSeries(ts) g.cleanupStaleSeries(ts)

View file

@ -1069,6 +1069,19 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total,
) )
var sampleLimitErr error var sampleLimitErr error
defer func() {
if err != nil {
app.Rollback()
return
}
if err = app.Commit(); err != nil {
return
}
// Only perform cache cleaning if the scrape was not empty.
// An empty scrape (usually) is used to indicate a failed scrape.
sl.cache.iterDone(len(b) > 0)
}()
loop: loop:
for { for {
var et textparse.Entry var et textparse.Entry
@ -1229,19 +1242,7 @@ loop:
return err == nil return err == nil
}) })
} }
if err != nil { return
app.Rollback()
return total, added, seriesAdded, err
}
if err := app.Commit(); err != nil {
return total, added, seriesAdded, err
}
// Only perform cache cleaning if the scrape was not empty.
// An empty scrape (usually) is used to indicate a failed scrape.
sl.cache.iterDone(len(b) > 0)
return total, added, seriesAdded, nil
} }
func yoloString(b []byte) string { func yoloString(b []byte) string {
@ -1258,67 +1259,71 @@ const (
scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff" scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff"
) )
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, err error) error { func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended, seriesAdded int, scrapeErr error) (err error) {
sl.scraper.Report(start, duration, err) sl.scraper.Report(start, duration, scrapeErr)
ts := timestamp.FromTime(start) ts := timestamp.FromTime(start)
var health float64 var health float64
if err == nil { if scrapeErr == nil {
health = 1 health = 1
} }
app := sl.appender() app := sl.appender()
defer func() {
if err != nil {
app.Rollback()
return
}
err = app.Commit()
}()
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil { if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil { if err = sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil { if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil { if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil { if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil {
app.Rollback() return
return err
} }
return app.Commit() return
} }
func (sl *scrapeLoop) reportStale(start time.Time) error { func (sl *scrapeLoop) reportStale(start time.Time) (err error) {
ts := timestamp.FromTime(start) ts := timestamp.FromTime(start)
app := sl.appender() app := sl.appender()
defer func() {
if err != nil {
app.Rollback()
return
}
err = app.Commit()
}()
stale := math.Float64frombits(value.StaleNaN) stale := math.Float64frombits(value.StaleNaN)
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil { if err = sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil { if err = sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil { if err = sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil { if err = sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil {
app.Rollback() return
return err
} }
if err := sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil { if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil {
app.Rollback() return
return err
} }
return app.Commit() return
} }
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {

View file

@ -1079,7 +1079,10 @@ func (a *headAppender) Commit() error {
} }
func (a *headAppender) Rollback() error { func (a *headAppender) Rollback() error {
a.head.metrics.activeAppenders.Dec() defer a.head.metrics.activeAppenders.Dec()
defer a.head.iso.closeAppend(a.appendID)
defer a.head.putSeriesBuffer(a.sampleSeries)
var series *memSeries var series *memSeries
for i := range a.samples { for i := range a.samples {
series = a.sampleSeries[i] series = a.sampleSeries[i]
@ -1090,8 +1093,6 @@ func (a *headAppender) Rollback() error {
} }
a.head.putAppendBuffer(a.samples) a.head.putAppendBuffer(a.samples)
a.samples = nil a.samples = nil
a.head.putSeriesBuffer(a.sampleSeries)
a.head.iso.closeAppend(a.appendID)
// Series are created in the head memory regardless of rollback. Thus we have // Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case. // to log them to the WAL in any case.