Completed more TODOs.

- Documented checkpoint file format.
- High-level description of series sanitation.
- Replace fp.LoadFromString panic with an error.
  (Change in client_golang already submitted.)
- Introduced checks for series file size where appropriate.
- Removed two Law of Demeter violations.

Change-Id: I555d97a2c8f4769820c2fc8bf5d6f4e160222abc
This commit is contained in:
Bjoern Rabenstein 2014-11-27 20:46:45 +01:00
parent 7d11019aa2
commit 674624f1c8
4 changed files with 90 additions and 23 deletions

View file

@ -349,7 +349,26 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
return nil return nil
} }
// TODO: Document. // sanitizeSeries sanitizes a series based on its series file as defined by the provided directory and FileInfo.
// The method returns the fingerprint as derived from the directory and file name, and whether the provided
// file has been sanatizide. A file that failed to be sanitized is deleted, if possible.
//
// The following steps are performed:
//
// - A file whose name doesn't comply with the naming scheme of a series file is simply deleted.
//
// - If the size of the series file isn't a multiple of the chunk size, extraneous bytes are truncated.
// If the truncation fails, the file is deleted instead.
//
// - A file that is empty (after truncation) is deleted.
//
// - A series that is not archived (i.e. it is in the fingerprintToSeries map) is checked for consistency of
// its various parameters (like head-chunk persistence state, offset of chunkDescs etc.). In particular,
// overlap between an in-memory head chunk with the most recent persisted chunk is checked. Inconsistencies
// are rectified.
//
// - A series this in archived (i.e. it is not in the fingerprintToSeries map) is checked for its presence
// in the index of archived series. If it cannot be found there, it is deleted.
func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) { func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name()) filename := path.Join(dirname, fi.Name())
purge := func() { purge := func() {
@ -364,7 +383,11 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint
purge() purge()
return fp, false return fp, false
} }
fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]) // TODO: Panics if that doesn't parse as hex. if err := fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil {
glog.Warningf("Error parsing file name %s: %s", filename, err)
purge()
return fp, false
}
bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen) bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen)
chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen) chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen)
@ -676,9 +699,6 @@ func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, er
// each index in indexes. It is the caller's responsibility to not persist or // each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently. // drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
// TODO: we need to verify at some point that file length is a multiple of
// the chunk size. When is the best time to do this, and where to remember
// it? Right now, we only do it when loading chunkDescs.
f, err := p.openChunkFileForReading(fp) f, err := p.openChunkFileForReading(fp)
if err != nil { if err != nil {
return nil, err return nil, err
@ -731,15 +751,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
} }
totalChunkLen := chunkHeaderLen + p.chunkLen totalChunkLen := chunkHeaderLen + p.chunkLen
if fi.Size()%int64(totalChunkLen) != 0 { if fi.Size()%int64(totalChunkLen) != 0 {
// TODO: record number of encountered corrupt series files in a metric? p.setDirty(true)
return nil, fmt.Errorf(
// Truncate the file size to the nearest multiple of chunkLen. "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
truncateTo := fi.Size() - fi.Size()%int64(totalChunkLen) fp, fi.Size(), totalChunkLen,
glog.Infof("Bad series file size for %s: %d bytes (no multiple of %d). Truncating to %d bytes.", fp, fi.Size(), totalChunkLen, truncateTo) )
// TODO: this doesn't work, as this is a read-only file handle.
if err := f.Truncate(truncateTo); err != nil {
return nil, err
}
} }
numChunks := int(fi.Size()) / totalChunkLen numChunks := int(fi.Size()) / totalChunkLen
@ -773,6 +789,42 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping // checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
// and all open (non-full) head chunks. Do not call concurrently with // and all open (non-full) head chunks. Do not call concurrently with
// loadSeriesMapAndHeads. // loadSeriesMapAndHeads.
//
// Description of the file format:
//
// (1) Magic string (const headsMagicString).
//
// (2) Varint-encoded format version (const headsFormatVersion).
//
// (3) Number of series in checkpoint as big-endian uint64.
//
// (4) Repeated once per series:
//
// (4.1) A flag byte, see flag constants above.
//
// (4.2) The fingerprint as big-endian uint64.
//
// (4.3) The metric as defined by codable.Metric.
//
// (4.4) The varint-encoded chunkDescsOffset.
//
// (4.5) The varint-encoded savedFirstTime.
//
// (4.6) The varint-encoded number of chunk descriptors.
//
// (4.7) Repeated once per chunk descriptor, oldest to most recent:
//
// (4.7.1) The varint-encoded first time.
//
// (4.7.2) The varint-encoded last time.
//
// (4.8) Exception to 4.7: If the most recent chunk is a non-persisted head chunk,
// the following is persisted instead of the most recent chunk descriptor:
//
// (4.8.1) A byte defining the chunk type.
//
// (4.8.2) The head chunk itself, marshaled with the marshal() method.
//
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
glog.Info("Checkpointing in-memory metrics and head chunks...") glog.Info("Checkpointing in-memory metrics and head chunks...")
begin := time.Now() begin := time.Now()
@ -1328,7 +1380,18 @@ func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.F
if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil { if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil {
return nil, err return nil, err
} }
return os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) f, err := os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
if err != nil {
return f, err
}
offset, err := f.Seek(0, os.SEEK_CUR)
if offset%int64(chunkHeaderLen+p.chunkLen) != 0 {
return f, fmt.Errorf(
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
fp, offset, chunkHeaderLen+p.chunkLen,
)
}
return f, err
} }
func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) {

View file

@ -99,10 +99,6 @@ func (p *memorySeriesPreloader) GetMetricRangeAtInterval(fp clientmodel.Fingerpr
// Close implements Preloader. // Close implements Preloader.
func (p *memorySeriesPreloader) Close() { func (p *memorySeriesPreloader) Close() {
// TODO: Idea about a primitive but almost free heuristic to not evict
// "recently used" chunks: Do not unpin the chunks right here, but hand
// over the pinnedChunkDescs to a manager that will delay the unpinning
// based on time and memory pressure.
for _, cd := range p.pinnedChunkDescs { for _, cd := range p.pinnedChunkDescs {
cd.unpin(p.storage.evictRequests) cd.unpin(p.storage.evictRequests)
} }

View file

@ -281,8 +281,7 @@ func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([
panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory") panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory")
} }
fp := s.metric.Fingerprint() fp := s.metric.Fingerprint()
// TODO: Remove law-of-Demeter violation? chunks, err := mss.loadChunks(fp, loadIndexes, s.chunkDescsOffset)
chunks, err := mss.persistence.loadChunks(fp, loadIndexes, s.chunkDescsOffset)
if err != nil { if err != nil {
// Unpin the chunks since we won't return them as pinned chunks now. // Unpin the chunks since we won't return them as pinned chunks now.
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
@ -343,8 +342,7 @@ func (s *memorySeries) preloadChunksForRange(
firstChunkDescTime = s.chunkDescs[0].firstTime() firstChunkDescTime = s.chunkDescs[0].firstTime()
} }
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
// TODO: Remove law-of-demeter violation? cds, err := mss.loadChunkDescs(fp, firstChunkDescTime)
cds, err := mss.persistence.loadChunkDescs(fp, firstChunkDescTime)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -774,6 +774,16 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime) s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime)
} }
// See persistence.loadChunks for detailed explanation.
func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
return s.persistence.loadChunks(fp, indexes, indexOffset)
}
// See persistence.loadChunkDescs for detailed explanation.
func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) {
return s.persistence.loadChunkDescs(fp, beforeTime)
}
// To expose persistQueueCap as metric: // To expose persistQueueCap as metric:
var ( var (
persistQueueCapDesc = prometheus.NewDesc( persistQueueCapDesc = prometheus.NewDesc(