diff --git a/rules/ast/ast.go b/rules/ast/ast.go index cf720eaed..79cc768f2 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -193,7 +193,6 @@ type ( iterators map[clientmodel.Fingerprint]local.SeriesIterator metrics map[clientmodel.Fingerprint]clientmodel.Metric // Fingerprints are populated from label matchers at query analysis time. - // TODO: do we still need these? fingerprints clientmodel.Fingerprints } @@ -234,7 +233,6 @@ type ( iterators map[clientmodel.Fingerprint]local.SeriesIterator metrics map[clientmodel.Fingerprint]clientmodel.Metric // Fingerprints are populated from label matchers at query analysis time. - // TODO: do we still need these? fingerprints clientmodel.Fingerprints interval time.Duration } diff --git a/rules/ast/view_adapter.go b/rules/ast/view_adapter.go deleted file mode 100644 index dc18611e8..000000000 --- a/rules/ast/view_adapter.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ast - -// TODO: remove file. diff --git a/storage/local/chunk.go b/storage/local/chunk.go index c0f47d467..0145e2e15 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -45,9 +45,7 @@ type chunkDesc struct { func newChunkDesc(c chunk) *chunkDesc { chunkOps.WithLabelValues(createAndPin).Inc() atomic.AddInt64(&numMemChunks, 1) - // TODO: numMemChunkDescs is actually never read except during metrics - // collection. Turn it into a real metric. - atomic.AddInt64(&numMemChunkDescs, 1) + numMemChunkDescs.Inc() return &chunkDesc{chunk: c, refCount: 1} } diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 58f6125a2..9d11fbfeb 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -37,6 +37,12 @@ var ( }, []string{opTypeLabel}, ) + numMemChunkDescs = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "memory_chunkdescs", + Help: "The current number of chunk descriptors in memory.", + }) ) const ( @@ -71,12 +77,13 @@ const ( func init() { prometheus.MustRegister(chunkOps) prometheus.MustRegister(chunkDescOps) + prometheus.MustRegister(numMemChunkDescs) } var ( - // Global counters, also used internally, so not implemented as + // Global counter, also used internally, so not implemented as // metrics. Collected in memorySeriesStorage.Collect. - numMemChunks, numMemChunkDescs int64 + numMemChunks int64 // Metric descriptors for the above. numMemChunksDesc = prometheus.NewDesc( @@ -84,9 +91,4 @@ var ( "The current number of chunks in memory, excluding cloned chunks (i.e. chunks without a descriptor).", nil, nil, ) - numMemChunkDescsDesc = prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "memory_chunkdescs"), - "The current number of chunk descriptors in memory.", - nil, nil, - ) ) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 513bf726b..f84ed1724 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -349,7 +349,26 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge return nil } -// TODO: Document. +// sanitizeSeries sanitizes a series based on its series file as defined by the provided directory and FileInfo. +// The method returns the fingerprint as derived from the directory and file name, and whether the provided +// file has been sanitized. A file that failed to be sanitized is deleted, if possible. +// +// The following steps are performed: +// +// - A file whose name doesn't comply with the naming scheme of a series file is simply deleted. +// +// - If the size of the series file isn't a multiple of the chunk size, extraneous bytes are truncated. +// If the truncation fails, the file is deleted instead. +// +// - A file that is empty (after truncation) is deleted. +// +// - A series that is not archived (i.e. it is in the fingerprintToSeries map) is checked for consistency of +// its various parameters (like head-chunk persistence state, offset of chunkDescs etc.). In particular, +// overlap between an in-memory head chunk with the most recent persisted chunk is checked. Inconsistencies +// are rectified. +// +// - A series this in archived (i.e. it is not in the fingerprintToSeries map) is checked for its presence +// in the index of archived series. If it cannot be found there, it is deleted. func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) { filename := path.Join(dirname, fi.Name()) purge := func() { @@ -364,7 +383,11 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint purge() return fp, false } - fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]) // TODO: Panics if that doesn't parse as hex. + if err := fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { + glog.Warningf("Error parsing file name %s: %s", filename, err) + purge() + return fp, false + } bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen) chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen) @@ -676,9 +699,6 @@ func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, er // each index in indexes. It is the caller's responsibility to not persist or // drop anything for the same fingerprint concurrently. func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { - // TODO: we need to verify at some point that file length is a multiple of - // the chunk size. When is the best time to do this, and where to remember - // it? Right now, we only do it when loading chunkDescs. f, err := p.openChunkFileForReading(fp) if err != nil { return nil, err @@ -731,15 +751,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie } totalChunkLen := chunkHeaderLen + p.chunkLen if fi.Size()%int64(totalChunkLen) != 0 { - // TODO: record number of encountered corrupt series files in a metric? - - // Truncate the file size to the nearest multiple of chunkLen. - truncateTo := fi.Size() - fi.Size()%int64(totalChunkLen) - glog.Infof("Bad series file size for %s: %d bytes (no multiple of %d). Truncating to %d bytes.", fp, fi.Size(), totalChunkLen, truncateTo) - // TODO: this doesn't work, as this is a read-only file handle. - if err := f.Truncate(truncateTo); err != nil { - return nil, err - } + p.setDirty(true) + return nil, fmt.Errorf( + "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", + fp, fi.Size(), totalChunkLen, + ) } numChunks := int(fi.Size()) / totalChunkLen @@ -766,13 +782,49 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie cds = append(cds, cd) } chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) - atomic.AddInt64(&numMemChunkDescs, int64(len(cds))) + numMemChunkDescs.Add(float64(len(cds))) return cds, nil } // checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping // and all open (non-full) head chunks. Do not call concurrently with // loadSeriesMapAndHeads. +// +// Description of the file format: +// +// (1) Magic string (const headsMagicString). +// +// (2) Varint-encoded format version (const headsFormatVersion). +// +// (3) Number of series in checkpoint as big-endian uint64. +// +// (4) Repeated once per series: +// +// (4.1) A flag byte, see flag constants above. +// +// (4.2) The fingerprint as big-endian uint64. +// +// (4.3) The metric as defined by codable.Metric. +// +// (4.4) The varint-encoded chunkDescsOffset. +// +// (4.5) The varint-encoded savedFirstTime. +// +// (4.6) The varint-encoded number of chunk descriptors. +// +// (4.7) Repeated once per chunk descriptor, oldest to most recent: +// +// (4.7.1) The varint-encoded first time. +// +// (4.7.2) The varint-encoded last time. +// +// (4.8) Exception to 4.7: If the most recent chunk is a non-persisted head chunk, +// the following is persisted instead of the most recent chunk descriptor: +// +// (4.8.1) A byte defining the chunk type. +// +// (4.8.2) The head chunk itself, marshaled with the marshal() method. +// func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { glog.Info("Checkpointing in-memory metrics and head chunks...") begin := time.Now() @@ -916,7 +968,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { } if err == nil { atomic.AddInt64(&numMemChunks, chunksTotal) - atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal) + numMemChunkDescs.Add(float64(chunkDescsTotal)) } }() @@ -1328,7 +1380,18 @@ func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.F if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil { return nil, err } - return os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) + f, err := os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) + if err != nil { + return f, err + } + offset, err := f.Seek(0, os.SEEK_CUR) + if offset%int64(chunkHeaderLen+p.chunkLen) != 0 { + return f, fmt.Errorf( + "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", + fp, offset, chunkHeaderLen+p.chunkLen, + ) + } + return f, err } func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { diff --git a/storage/local/preload.go b/storage/local/preload.go index 1e1cd6c96..ef5f110e0 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -99,10 +99,6 @@ func (p *memorySeriesPreloader) GetMetricRangeAtInterval(fp clientmodel.Fingerpr // Close implements Preloader. func (p *memorySeriesPreloader) Close() { - // TODO: Idea about a primitive but almost free heuristic to not evict - // "recently used" chunks: Do not unpin the chunks right here, but hand - // over the pinnedChunkDescs to a manager that will delay the unpinning - // based on time and memory pressure. for _, cd := range p.pinnedChunkDescs { cd.unpin(p.storage.evictRequests) } diff --git a/storage/local/series.go b/storage/local/series.go index 8910cbd6f..9d2286067 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -235,7 +235,7 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { lenEvicted := len(s.chunkDescs) - lenToKeep s.chunkDescsOffset += lenEvicted chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) - atomic.AddInt64(&numMemChunkDescs, -int64(lenEvicted)) + numMemChunkDescs.Sub(float64(lenEvicted)) s.chunkDescs = append( make([]*chunkDesc, 0, lenToKeep), s.chunkDescs[lenEvicted:]..., @@ -257,7 +257,7 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { } if keepIdx > 0 { s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) - atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx)) + numMemChunkDescs.Sub(float64(keepIdx)) } return keepIdx, len(s.chunkDescs) == 0 } @@ -281,8 +281,7 @@ func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([ panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory") } fp := s.metric.Fingerprint() - // TODO: Remove law-of-Demeter violation? - chunks, err := mss.persistence.loadChunks(fp, loadIndexes, s.chunkDescsOffset) + chunks, err := mss.loadChunks(fp, loadIndexes, s.chunkDescsOffset) if err != nil { // Unpin the chunks since we won't return them as pinned chunks now. for _, cd := range pinnedChunkDescs { @@ -343,8 +342,7 @@ func (s *memorySeries) preloadChunksForRange( firstChunkDescTime = s.chunkDescs[0].firstTime() } if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { - // TODO: Remove law-of-demeter violation? - cds, err := mss.persistence.loadChunkDescs(fp, firstChunkDescTime) + cds, err := mss.loadChunkDescs(fp, firstChunkDescTime) if err != nil { return nil, err } diff --git a/storage/local/storage.go b/storage/local/storage.go index 20d807083..7544e0149 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -774,6 +774,16 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime) } +// See persistence.loadChunks for detailed explanation. +func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { + return s.persistence.loadChunks(fp, indexes, indexOffset) +} + +// See persistence.loadChunkDescs for detailed explanation. +func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) { + return s.persistence.loadChunkDescs(fp, beforeTime) +} + // To expose persistQueueCap as metric: var ( persistQueueCapDesc = prometheus.NewDesc( @@ -801,7 +811,6 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { ch <- persistQueueCapDesc ch <- numMemChunksDesc - ch <- numMemChunkDescsDesc } // Collect implements prometheus.Collector. @@ -820,6 +829,4 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { count := atomic.LoadInt64(&numMemChunks) ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count)) - count = atomic.LoadInt64(&numMemChunkDescs) - ch <- prometheus.MustNewConstMetric(numMemChunkDescsDesc, prometheus.GaugeValue, float64(count)) } diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 7e4538387..91b1156c3 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -11,10 +11,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +// NOTE ON FILENAME: Do not rename this file helpers_test.go (which might appear +// an obvious choice). We need NewTestStorage in tests outside of the local +// package, too. On the other hand, moving NewTestStorage in its own package +// would cause circular dependencies in the tests in packages local. + package local import ( - "testing" "time" "github.com/prometheus/prometheus/utility/test" @@ -33,7 +37,7 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t testing.TB) (Storage, test.Closer) { +func NewTestStorage(t test.T) (Storage, test.Closer) { directory := test.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ MemoryChunks: 1000000, diff --git a/utility/test/directory.go b/utility/test/directory.go index 1572f74df..994439591 100644 --- a/utility/test/directory.go +++ b/utility/test/directory.go @@ -16,7 +16,6 @@ package test import ( "io/ioutil" "os" - "testing" ) const ( @@ -51,12 +50,21 @@ type ( // their interactions. temporaryDirectory struct { path string - tester testing.TB + tester T } callbackCloser struct { fn func() } + + // T implements the needed methods of testing.TB so that we do not need + // to actually import testing (which has the side affect of adding all + // the test flags, which we do not want in non-test binaries even if + // they make use of these utilities for some reason). + T interface { + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) + } ) func (c nilCloser) Close() { @@ -90,7 +98,7 @@ func (t temporaryDirectory) Path() string { // NewTemporaryDirectory creates a new temporary directory for transient POSIX // activities. -func NewTemporaryDirectory(name string, t testing.TB) (handler TemporaryDirectory) { +func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) { var ( directory string err error