Fix error checking and logging around checkpointing.

This commit is contained in:
Julius Volz 2015-09-07 18:08:23 +02:00
parent 09c353c272
commit 6774a73878
2 changed files with 30 additions and 14 deletions

View file

@ -540,15 +540,19 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
begin := time.Now() begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil { if err != nil {
return return err
} }
defer func() { defer func() {
f.Sync() syncErr := f.Sync()
closeErr := f.Close() closeErr := f.Close()
if err != nil { if err != nil {
return return
} }
err = syncErr
if err != nil {
return
}
err = closeErr err = closeErr
if err != nil { if err != nil {
return return
@ -562,18 +566,18 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
w := bufio.NewWriterSize(f, fileBufSize) w := bufio.NewWriterSize(f, fileBufSize)
if _, err = w.WriteString(headsMagicString); err != nil { if _, err = w.WriteString(headsMagicString); err != nil {
return return err
} }
var numberOfSeriesOffset int var numberOfSeriesOffset int
if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil { if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil {
return return err
} }
numberOfSeriesOffset += len(headsMagicString) numberOfSeriesOffset += len(headsMagicString)
numberOfSeriesInHeader := uint64(fingerprintToSeries.length()) numberOfSeriesInHeader := uint64(fingerprintToSeries.length())
// We have to write the number of series as uint64 because we might need // We have to write the number of series as uint64 because we might need
// to overwrite it later, and a varint might change byte width then. // to overwrite it later, and a varint might change byte width then.
if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil { if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil {
return return err
} }
iter := fingerprintToSeries.iter() iter := fingerprintToSeries.iter()
@ -606,7 +610,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if err != nil { if err != nil {
return return
} }
w.Write(buf) if _, err = w.Write(buf); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil { if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
return return
} }
@ -646,27 +652,33 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
} }
} }
// Series is checkpointed now, so declare it clean. // Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series
// maintenance will mark these series newly dirty again, continuously
// increasing the total number of dirty series as seen by the storage.
// This has the effect of triggering a new checkpoint attempt even
// earlier than if we hadn't incorrectly set "dirty" to "false" here
// already.
m.series.dirty = false m.series.dirty = false
}() }()
if err != nil { if err != nil {
return return err
} }
} }
if err = w.Flush(); err != nil { if err = w.Flush(); err != nil {
return return err
} }
if realNumberOfSeries != numberOfSeriesInHeader { if realNumberOfSeries != numberOfSeriesInHeader {
// The number of series has changed in the meantime. // The number of series has changed in the meantime.
// Rewrite it in the header. // Rewrite it in the header.
if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil { if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil {
return return err
} }
if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil { if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil {
return return err
} }
} }
return return err
} }
// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all // loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all

View file

@ -875,8 +875,12 @@ loop:
case <-s.loopStopping: case <-s.loopStopping:
break loop break loop
case <-checkpointTimer.C: case <-checkpointTimer.C:
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
if err != nil {
log.Errorln("Error while checkpointing:", err)
} else {
dirtySeriesCount = 0 dirtySeriesCount = 0
}
checkpointTimer.Reset(s.checkpointInterval) checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints: case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {