Merge pull request #1058 from prometheus/check-errors

Fix error checking and logging around checkpointing.
This commit is contained in:
Julius Volz 2015-09-07 19:57:16 +02:00
commit ffc5142c54
2 changed files with 30 additions and 14 deletions

View file

@ -540,15 +540,19 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
return
return err
}
defer func() {
f.Sync()
syncErr := f.Sync()
closeErr := f.Close()
if err != nil {
return
}
err = syncErr
if err != nil {
return
}
err = closeErr
if err != nil {
return
@ -562,18 +566,18 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
w := bufio.NewWriterSize(f, fileBufSize)
if _, err = w.WriteString(headsMagicString); err != nil {
return
return err
}
var numberOfSeriesOffset int
if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil {
return
return err
}
numberOfSeriesOffset += len(headsMagicString)
numberOfSeriesInHeader := uint64(fingerprintToSeries.length())
// We have to write the number of series as uint64 because we might need
// to overwrite it later, and a varint might change byte width then.
if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil {
return
return err
}
iter := fingerprintToSeries.iter()
@ -606,7 +610,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if err != nil {
return
}
w.Write(buf)
if _, err = w.Write(buf); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
return
}
@ -646,27 +652,33 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
}
}
// Series is checkpointed now, so declare it clean.
// Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series
// maintenance will mark these series newly dirty again, continuously
// increasing the total number of dirty series as seen by the storage.
// This has the effect of triggering a new checkpoint attempt even
// earlier than if we hadn't incorrectly set "dirty" to "false" here
// already.
m.series.dirty = false
}()
if err != nil {
return
return err
}
}
if err = w.Flush(); err != nil {
return
return err
}
if realNumberOfSeries != numberOfSeriesInHeader {
// The number of series has changed in the meantime.
// Rewrite it in the header.
if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil {
return
return err
}
if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil {
return
return err
}
}
return
return err
}
// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all

View file

@ -875,8 +875,12 @@ loop:
case <-s.loopStopping:
break loop
case <-checkpointTimer.C:
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
dirtySeriesCount = 0
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
if err != nil {
log.Errorln("Error while checkpointing:", err)
} else {
dirtySeriesCount = 0
}
checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {