storage: Test for errors returned by MaybePopulateLastTime

This commit is contained in:
beorn7 2017-02-01 20:14:01 +01:00
parent 752fac60ae
commit 65dc8f44d3
4 changed files with 17 additions and 9 deletions

View file

@ -188,7 +188,9 @@ func (hs *headsScanner) scan() bool {
// This is NOT the head chunk. So it's a chunk // This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime. // to be persisted, and we need to populate lastTime.
hs.chunksToPersistTotal++ hs.chunksToPersistTotal++
cd.MaybePopulateLastTime() if hs.err = cd.MaybePopulateLastTime(); hs.err != nil {
return false
}
} }
chunkDescs[i] = cd chunkDescs[i] = cd
} }

View file

@ -693,7 +693,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
} }
// persistWatermark. We only checkpoint chunks that need persisting, so // persistWatermark. We only checkpoint chunks that need persisting, so
// this is always 0. // this is always 0.
if _, err = codable.EncodeVarint(w, int64(0)); err != nil { if _, err = codable.EncodeVarint(w, 0); err != nil {
return return
} }
if m.series.modTime.IsZero() { if m.series.modTime.IsZero() {

View file

@ -247,7 +247,9 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) {
// Populate lastTime of now-closed chunks. // Populate lastTime of now-closed chunks.
for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] { for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] {
cd.MaybePopulateLastTime() if err := cd.MaybePopulateLastTime(); err != nil {
return 0, err
}
} }
s.lastTime = v.Timestamp s.lastTime = v.Timestamp
@ -261,19 +263,18 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) {
// If the head chunk is already closed, the method is a no-op and returns false. // If the head chunk is already closed, the method is a no-op and returns false.
// //
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) maybeCloseHeadChunk() bool { func (s *memorySeries) maybeCloseHeadChunk() (bool, error) {
if s.headChunkClosed { if s.headChunkClosed {
return false return false, nil
} }
if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout { if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout {
s.headChunkClosed = true s.headChunkClosed = true
// Since we cannot modify the head chunk from now on, we // Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore. // don't need to bother with cloning anymore.
s.headChunkUsedByIterator = false s.headChunkUsedByIterator = false
s.head().MaybePopulateLastTime() return true, s.head().MaybePopulateLastTime()
return true
} }
return false return false, nil
} }
// evictChunkDescs evicts chunkDescs if the chunk is evicted. // evictChunkDescs evicts chunkDescs if the chunk is evicted.

View file

@ -1376,7 +1376,12 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
if series.maybeCloseHeadChunk() { closed, err := series.maybeCloseHeadChunk()
if err != nil {
s.quarantineSeries(fp, series.metric, err)
s.persistErrors.Inc()
}
if closed {
s.incNumChunksToPersist(1) s.incNumChunksToPersist(1)
} }