mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #6951 from prometheus/beorn7/tsdb
tsdb: Do a full rollback upon commit error
This commit is contained in:
commit
6029fa0b1b
|
@ -116,7 +116,10 @@ type Appender interface {
|
||||||
// faster than adding a sample by providing its full label set.
|
// faster than adding a sample by providing its full label set.
|
||||||
AddFast(ref uint64, t int64, v float64) error
|
AddFast(ref uint64, t int64, v float64) error
|
||||||
|
|
||||||
// Commit submits the collected samples and purges the batch.
|
// Commit submits the collected samples and purges the batch. If Commit
|
||||||
|
// returns a non-nil error, it also rolls back all modifications made in
|
||||||
|
// the appender so far, as Rollback would do. In any case, an Appender
|
||||||
|
// must not be used anymore after Commit has been called.
|
||||||
Commit() error
|
Commit() error
|
||||||
|
|
||||||
// Rollback rolls back all modifications made in the appender so far.
|
// Rollback rolls back all modifications made in the appender so far.
|
||||||
|
|
11
tsdb/head.go
11
tsdb/head.go
|
@ -1042,15 +1042,17 @@ func (a *headAppender) log() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) Commit() error {
|
func (a *headAppender) Commit() error {
|
||||||
|
if err := a.log(); err != nil {
|
||||||
|
//nolint: errcheck
|
||||||
|
a.Rollback() // Most likely the same error will happen again.
|
||||||
|
return errors.Wrap(err, "write to WAL")
|
||||||
|
}
|
||||||
|
|
||||||
defer a.head.metrics.activeAppenders.Dec()
|
defer a.head.metrics.activeAppenders.Dec()
|
||||||
defer a.head.putAppendBuffer(a.samples)
|
defer a.head.putAppendBuffer(a.samples)
|
||||||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||||||
defer a.head.iso.closeAppend(a.appendID)
|
defer a.head.iso.closeAppend(a.appendID)
|
||||||
|
|
||||||
if err := a.log(); err != nil {
|
|
||||||
return errors.Wrap(err, "write to WAL")
|
|
||||||
}
|
|
||||||
|
|
||||||
total := len(a.samples)
|
total := len(a.samples)
|
||||||
var series *memSeries
|
var series *memSeries
|
||||||
for i, s := range a.samples {
|
for i, s := range a.samples {
|
||||||
|
@ -1088,6 +1090,7 @@ func (a *headAppender) Rollback() error {
|
||||||
}
|
}
|
||||||
a.head.putAppendBuffer(a.samples)
|
a.head.putAppendBuffer(a.samples)
|
||||||
a.samples = nil
|
a.samples = nil
|
||||||
|
a.head.putSeriesBuffer(a.sampleSeries)
|
||||||
a.head.iso.closeAppend(a.appendID)
|
a.head.iso.closeAppend(a.appendID)
|
||||||
|
|
||||||
// Series are created in the head memory regardless of rollback. Thus we have
|
// Series are created in the head memory regardless of rollback. Thus we have
|
||||||
|
|
Loading…
Reference in a new issue