Fix a bug handling freshly unarchived series.

Usually, if you unarchive a series, it is to add something to it,
which will create a new head chunk. However, if a series in
unarchived, and before anything is added to it, it is handled by the
maintenance loop, it will be archived again. In that case, we have to
load the chunkDescs to know the lastTime of the series to be
archived. Usually, this case will happen only rarely (as a race, has
never happened so far, possibly because the locking around unarchiving
and the subsequent sample append is smart enough). However, during
crash recovery, we sometimes treat series as "freshly unarchived"
without directly appending a sample. We might add more cases of that
type later, so better deal with archiving properly and load chunkDescs
if required.
This commit is contained in:
Bjoern Rabenstein 2015-01-08 16:10:31 +01:00
parent 7aae5dd873
commit 622e8350cd
5 changed files with 36 additions and 25 deletions

View file

@ -14,9 +14,9 @@
package local package local
import ( import (
"time"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"time"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )

View file

@ -85,7 +85,7 @@ type indexingOp struct {
// A Persistence is used by a Storage implementation to store samples // A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if // persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods PersistChunk, // explicitly marked as such below. The chunk-related methods PersistChunk,
// DropChunks, LoadChunks, and LoadChunkDescs can be called concurrently with // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint. // each other if each call refers to a different fingerprint.
type persistence struct { type persistence struct {
basePath string basePath string
@ -350,26 +350,31 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
return nil return nil
} }
// sanitizeSeries sanitizes a series based on its series file as defined by the provided directory and FileInfo. // sanitizeSeries sanitizes a series based on its series file as defined by the
// The method returns the fingerprint as derived from the directory and file name, and whether the provided // provided directory and FileInfo. The method returns the fingerprint as
// file has been sanitized. A file that failed to be sanitized is deleted, if possible. // derived from the directory and file name, and whether the provided file has
// been sanitized. A file that failed to be sanitized is deleted, if possible.
// //
// The following steps are performed: // The following steps are performed:
// //
// - A file whose name doesn't comply with the naming scheme of a series file is simply deleted. // - A file whose name doesn't comply with the naming scheme of a series file is
// simply deleted.
// //
// - If the size of the series file isn't a multiple of the chunk size, extraneous bytes are truncated. // - If the size of the series file isn't a multiple of the chunk size,
// If the truncation fails, the file is deleted instead. // extraneous bytes are truncated. If the truncation fails, the file is
// deleted instead.
// //
// - A file that is empty (after truncation) is deleted. // - A file that is empty (after truncation) is deleted.
// //
// - A series that is not archived (i.e. it is in the fingerprintToSeries map) is checked for consistency of // - A series that is not archived (i.e. it is in the fingerprintToSeries map)
// its various parameters (like head-chunk persistence state, offset of chunkDescs etc.). In particular, // is checked for consistency of its various parameters (like head-chunk
// overlap between an in-memory head chunk with the most recent persisted chunk is checked. Inconsistencies // persistence state, offset of chunkDescs etc.). In particular, overlap
// are rectified. // between an in-memory head chunk with the most recent persisted chunk is
// checked. Inconsistencies are rectified.
// //
// - A series this in archived (i.e. it is not in the fingerprintToSeries map) is checked for its presence // - A series this in archived (i.e. it is not in the fingerprintToSeries map)
// in the index of archived series. If it cannot be found there, it is deleted. // is checked for its presence in the index of archived series. If it cannot
// be found there, it is deleted.
func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) { func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name()) filename := path.Join(dirname, fi.Name())
purge := func() { purge := func() {

View file

@ -397,7 +397,8 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
} }
// head returns a pointer to the head chunk descriptor. The caller must have // head returns a pointer to the head chunk descriptor. The caller must have
// locked the fingerprint of the memorySeries. // locked the fingerprint of the memorySeries. This method will panic if this
// series has no chunk descriptors.
func (s *memorySeries) head() *chunkDesc { func (s *memorySeries) head() *chunkDesc {
return s.chunkDescs[len(s.chunkDescs)-1] return s.chunkDescs[len(s.chunkDescs)-1]
} }
@ -411,12 +412,6 @@ func (s *memorySeries) firstTime() clientmodel.Timestamp {
return s.savedFirstTime return s.savedFirstTime
} }
// lastTime returns the timestamp of the last sample in the series. The caller
// must have locked the fingerprint of the memorySeries.
func (s *memorySeries) lastTime() clientmodel.Timestamp {
return s.head().lastTime()
}
// memorySeriesIterator implements SeriesIterator. // memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct { type memorySeriesIterator struct {
lock, unlock func() lock, unlock func()

View file

@ -16,6 +16,7 @@ package local
import ( import (
"container/list" "container/list"
"math"
"sync/atomic" "sync/atomic"
"time" "time"
@ -704,13 +705,23 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
if iOldestNotEvicted == -1 { if iOldestNotEvicted == -1 {
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
// Make sure we have a head chunk descriptor (a freshly
// unarchived series has none).
if len(series.chunkDescs) == 0 {
cds, err := s.loadChunkDescs(fp, math.MaxInt64)
if err != nil {
glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err)
return
}
series.chunkDescs = cds
}
if err := s.persistence.archiveMetric( if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.lastTime(), fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil { ); err != nil {
glog.Errorf("Error archiving metric %v: %v", series.metric, err) glog.Errorf("Error archiving metric %v: %v", series.metric, err)
} else { return
s.seriesOps.WithLabelValues(archive).Inc()
} }
s.seriesOps.WithLabelValues(archive).Inc()
return return
} }
// If we are here, the series is not archived, so check for chunkDesc // If we are here, the series is not archived, so check for chunkDesc

View file

@ -382,7 +382,7 @@ func TestEvictAndPurgeSeries(t *testing.T) {
// Archive metrics. // Archive metrics.
ms.fpToSeries.del(fp) ms.fpToSeries.del(fp)
if err := ms.persistence.archiveMetric( if err := ms.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.lastTime(), fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil { ); err != nil {
t.Fatal(err) t.Fatal(err)
} }