From f6f4fd6556d01ea1c7d093173127f6a0446a28f9 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 9 Mar 2020 18:24:18 +0100 Subject: [PATCH] tsdb: Do a full rollback upon commit error I think the previous behavior is problematic as it will leave `memSeries` around that still have `pendingCommit` set to `true`. The only case where this can happen in this code path is a failure to write to the WAL, in which case we are probably in trouble anyway. I believe, however, we should still try to do the right thing and do the full rollback. This will implicitly try to write to the WAL again, but this time without samples, which may even succeed. (But we propagate the previous error in any case.) This also adds `a.head.putSeriesBuffer(a.sampleSeries)` to Rollback, which was previously missing. Signed-off-by: beorn7 --- storage/interface.go | 5 ++++- tsdb/head.go | 11 +++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/storage/interface.go b/storage/interface.go index 539c617b6..e85a159e4 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -116,7 +116,10 @@ type Appender interface { // faster than adding a sample by providing its full label set. AddFast(ref uint64, t int64, v float64) error - // Commit submits the collected samples and purges the batch. + // Commit submits the collected samples and purges the batch. If Commit + // returns a non-nil error, it also rolls back all modifications made in + // the appender so far, as Rollback would do. In any case, an Appender + // must not be used anymore after Commit has been called. Commit() error // Rollback rolls back all modifications made in the appender so far. diff --git a/tsdb/head.go b/tsdb/head.go index 7c05bd01b..0de043000 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1042,15 +1042,17 @@ func (a *headAppender) log() error { } func (a *headAppender) Commit() error { + if err := a.log(); err != nil { + //nolint: errcheck + a.Rollback() // Most likely the same error will happen again. + return errors.Wrap(err, "write to WAL") + } + defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.iso.closeAppend(a.appendID) - if err := a.log(); err != nil { - return errors.Wrap(err, "write to WAL") - } - total := len(a.samples) var series *memSeries for i, s := range a.samples { @@ -1088,6 +1090,7 @@ func (a *headAppender) Rollback() error { } a.head.putAppendBuffer(a.samples) a.samples = nil + a.head.putSeriesBuffer(a.sampleSeries) a.head.iso.closeAppend(a.appendID) // Series are created in the head memory regardless of rollback. Thus we have