Add crash recovery.

Fix the behavior if preload for non-existent series is requested.

Instead of returning an error (which triggers a panic further up),
simply count those incidents. They can happen regularly, we just want
to know if they happen too frequently because that would mean the
indexing is behind or broken.

Change-Id: I4b2d1b93c4146eeea897d188063cb9574a270f8b
This commit is contained in:
Bjoern Rabenstein 2014-11-05 20:02:45 +01:00
parent 7a9efc9c59
commit 904acd43da
6 changed files with 560 additions and 74 deletions

View file

@ -60,6 +60,8 @@ var (
checkpointInterval = flag.Duration("storage.checkpointInterval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
storageDirty = flag.Bool("storage.dirty", false, "If set, the storage layer will perform crash recovery even if the last shutdown appears to be clean.")
notificationQueueCapacity = flag.Int("alertmanager.notificationQueueCapacity", 100, "The size of the queue for pending alert manager notifications.")
printVersion = flag.Bool("version", false, "print version information")
@ -107,6 +109,7 @@ func NewPrometheus() *prometheus {
PersistencePurgeInterval: *storagePurgeInterval,
PersistenceRetentionPeriod: *storageRetentionPeriod,
CheckpointInterval: *checkpointInterval,
Dirty: *storageDirty,
}
memStorage, err := local.NewMemorySeriesStorage(o)
if err != nil {

View file

@ -18,6 +18,7 @@ package index
import (
"flag"
"os"
"path"
clientmodel "github.com/prometheus/client_golang/model"
@ -174,6 +175,12 @@ func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex,
}, nil
}
// DeleteLabelNameLabelValuesIndex deletes the LevelDB-backed
// LabelNameLabelValuesIndex. Use only for a not yet opened index.
func DeleteLabelNameLabelValuesIndex(basePath string) error {
return os.RemoveAll(path.Join(basePath, labelNameToLabelValuesDir))
}
// LabelPairFingerprintsMapping is an in-memory map of label pairs to
// fingerprints.
type LabelPairFingerprintsMapping map[metric.LabelPair]codable.FingerprintSet
@ -242,6 +249,12 @@ func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex,
}, nil
}
// DeleteLabelPairFingerprintIndex deletes the LevelDB-backed
// LabelPairFingerprintIndex. Use only for a not yet opened index.
func DeleteLabelPairFingerprintIndex(basePath string) error {
return os.RemoveAll(path.Join(basePath, labelPairToFingerprintsDir))
}
// FingerprintTimeRangeIndex models a database tracking the time ranges
// of metrics by their fingerprints.
type FingerprintTimeRangeIndex struct {
@ -259,13 +272,6 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim
return tr.First, tr.Last, ok, err
}
// Has returns true if the given fingerprint is present.
//
// This method is goroutine-safe.
func (i *FingerprintTimeRangeIndex) Has(fp clientmodel.Fingerprint) (ok bool, err error) {
return i.KeyValueStore.Has(codable.Fingerprint(fp))
}
// NewFingerprintTimeRangeIndex returns a LevelDB-backed
// FingerprintTimeRangeIndex ready to use.
func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) {

View file

@ -18,8 +18,10 @@ import (
"encoding/binary"
"fmt"
"io"
"math"
"os"
"path"
"strings"
"sync"
"sync/atomic"
"time"
@ -52,7 +54,7 @@ const (
indexingMaxBatchSize = 1024 * 1024
indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long.
indexingQueueCapacity = 1024
indexingQueueCapacity = 1024 * 16
)
const (
@ -103,14 +105,17 @@ type persistence struct {
indexingBatchSizes prometheus.Summary
indexingBatchLatency prometheus.Summary
checkpointDuration prometheus.Gauge
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty, becameDirty bool
}
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(basePath string, chunkLen int) (*persistence, error) {
func newPersistence(basePath string, chunkLen int, dirty bool) (*persistence, error) {
if err := os.MkdirAll(basePath, 0700); err != nil {
return nil, err
}
var err error
archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
if err != nil {
return nil, err
@ -119,14 +124,6 @@ func newPersistence(basePath string, chunkLen int) (*persistence, error) {
if err != nil {
return nil, err
}
labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
if err != nil {
return nil, err
}
labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
if err != nil {
return nil, err
}
p := &persistence{
basePath: basePath,
@ -134,8 +131,6 @@ func newPersistence(basePath string, chunkLen int) (*persistence, error) {
archivedFingerprintToMetrics: archivedFingerprintToMetrics,
archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,
labelPairToFingerprints: labelPairToFingerprints,
labelNameToLabelValues: labelNameToLabelValues,
indexingQueue: make(chan indexingOp, indexingQueueCapacity),
indexingStopped: make(chan struct{}),
@ -178,7 +173,36 @@ func newPersistence(basePath string, chunkLen int) (*persistence, error) {
Name: "checkpoint_duration_milliseconds",
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
}),
dirty: dirty,
}
if dirtyFile, err := os.OpenFile(p.dirtyFileName(), os.O_CREATE|os.O_EXCL, 0666); err == nil {
dirtyFile.Close()
} else if os.IsExist(err) {
p.dirty = true
} else {
return nil, err
}
if p.dirty {
// Blow away the label indexes. We'll rebuild them later.
if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil {
return nil, err
}
if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil {
return nil, err
}
}
labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
if err != nil {
return nil, err
}
labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
if err != nil {
return nil, err
}
p.labelPairToFingerprints = labelPairToFingerprints
p.labelNameToLabelValues = labelNameToLabelValues
go p.processIndexingQueue()
return p, nil
}
@ -203,6 +227,374 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
ch <- p.checkpointDuration
}
// dirtyFileName returns the name of the (empty) file used to mark the
// persistency layer as dirty.
func (p *persistence) dirtyFileName() string {
return path.Join(p.basePath, "DIRTY")
}
// isDirty returns the dirty flag in a goroutine-safe way.
func (p *persistence) isDirty() bool {
p.dirtyMtx.Lock()
defer p.dirtyMtx.Unlock()
return p.dirty
}
// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was
// set to true with this method, it cannot be set to false again. (If we became
// dirty during our runtime, there is no way back. If we were dirty from the
// start, a clean-up might make us clean again.)
func (p *persistence) setDirty(dirty bool) {
p.dirtyMtx.Lock()
defer p.dirtyMtx.Unlock()
if p.becameDirty {
return
}
p.dirty = dirty
if dirty {
p.becameDirty = true
glog.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
}
}
// crashRecovery is called by loadSeriesMapAndHeads if the persistence appears
// to be dirty after the loading (either because the loading resulted in an
// error or because the persistence was dirty from the start).
func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error {
glog.Warning("Starting crash recovery. Prometheus is inoperational until complete.")
fpsSeen := map[clientmodel.Fingerprint]struct{}{}
count := 0
glog.Info("Scanning files.")
for i := 0; i < 256; i++ {
dirname := path.Join(p.basePath, fmt.Sprintf("%02x", i))
dir, err := os.Open(dirname)
if os.IsNotExist(err) {
continue
}
if err != nil {
return err
}
defer dir.Close()
var fis []os.FileInfo
for ; err != io.EOF; fis, err = dir.Readdir(1024) {
if err != nil {
return err
}
for _, fi := range fis {
fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries)
if ok {
fpsSeen[fp] = struct{}{}
}
count++
if count%10000 == 0 {
glog.Infof("%d files scanned.", count)
}
}
}
}
glog.Infof("File scan complete. %d fingerprints found.", len(fpsSeen))
glog.Info("Checking for series without series file.")
for fp, s := range fingerprintToSeries {
if _, seen := fpsSeen[fp]; !seen {
// fp exists in fingerprintToSeries, but has no representation on disk.
if s.headChunkPersisted {
// Oops, head chunk was persisted, but nothing on disk.
// Thus, we lost that series completely. Clean up the remnants.
delete(fingerprintToSeries, fp)
if err := p.dropArchivedMetric(fp); err != nil {
// Dropping the archived metric didn't work, so try
// to unindex it, just in case it's in the indexes.
p.unindexMetric(fp, s.metric)
}
glog.Warningf("Lost series detected: fingerprint %v, metric %v.", fp, s.metric)
continue
}
// If we are here, the only chunk we have is the head chunk.
// Adjust things accordingly.
if len(s.chunkDescs) > 1 || s.chunkDescsOffset != 0 {
glog.Warningf(
"Lost at least %d chunks for fingerprint %v, metric %v.",
len(s.chunkDescs)+s.chunkDescsOffset-1, fp, s.metric,
// If chunkDescsOffset is -1, this will underreport. Oh well...
)
s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:]
s.chunkDescsOffset = 0
}
fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete.
}
}
glog.Info("Check for series without series file complete.")
if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen); err != nil {
return err
}
if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil {
return err
}
p.setDirty(false)
glog.Warning("Crash recovery complete.")
return nil
}
// TODO: Document.
func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name())
purge := func() {
glog.Warningf("Deleting lost series file %s.", filename) // TODO: Move to lost+found directory?
os.Remove(filename)
}
var fp clientmodel.Fingerprint
if len(fi.Name()) != 17 || !strings.HasSuffix(fi.Name(), ".db") {
glog.Warningf("Unexpected series file name %s.", filename)
purge()
return fp, false
}
fp.LoadFromString(path.Base(dirname) + fi.Name()[:14]) // TODO: Panics if that doesn't parse as hex.
bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen)
chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen)
if bytesToTrim != 0 {
glog.Warningf(
"Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.",
filename, chunksInFile, bytesToTrim,
)
f, err := os.OpenFile(filename, os.O_WRONLY, 0640)
if err != nil {
glog.Errorf("Could not open file %s: %s", filename, err)
purge()
return fp, false
}
if err := f.Truncate(fi.Size() - bytesToTrim); err != nil {
glog.Errorf("Failed to truncate file %s: %s", filename, err)
purge()
return fp, false
}
}
if chunksInFile == 0 {
glog.Warningf("No chunks left in file %s.", filename)
purge()
return fp, false
}
s, ok := fingerprintToSeries[fp]
if ok {
// This series is supposed to not be archived.
if s == nil {
panic("fingerprint mapped to nil pointer")
}
if bytesToTrim == 0 && s.chunkDescsOffset != -1 &&
((s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)) ||
(!s.headChunkPersisted && chunksInFile == s.chunkDescsOffset+len(s.chunkDescs)-1)) {
// Everything is consistent. We are good.
return fp, true
}
// If we are here, something's fishy.
if s.headChunkPersisted {
// This is the easy case as we don't have a head chunk
// in heads.db. Treat this series as a freshly
// unarchived one. No chunks or chunkDescs in memory, no
// current head chunk.
glog.Warningf(
"Treating recovered metric %v, fingerprint %v, as freshly unarchvied, with %d chunks in series file.",
s.metric, fp, chunksInFile,
)
s.chunkDescs = nil
s.chunkDescsOffset = -1
return fp, true
}
// This is the tricky one: We have a head chunk from heads.db,
// but the very same head chunk might already be in the series
// file. Strategy: Check the first time of both. If it is the
// same or newer, assume the latest chunk in the series file
// is the most recent head chunk. If not, keep the head chunk
// we got from heads.db.
// First, assume the head chunk is not yet persisted.
s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:]
s.chunkDescsOffset = -1
// Load all the chunk descs (which assumes we have none from the future).
cds, err := p.loadChunkDescs(fp, clientmodel.Now())
if err != nil {
glog.Errorf(
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
if cds[len(cds)-1].firstTime().Before(s.head().firstTime()) {
s.chunkDescs = append(cds, s.chunkDescs...)
glog.Warningf(
"Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered head chunk from checkpoint.",
s.metric, fp, chunksInFile,
)
} else {
glog.Warningf(
"Recovered metric %v, fingerprint %v: head chunk found among the %d recovered chunks in series file.",
s.metric, fp, chunksInFile,
)
s.chunkDescs = cds
s.headChunkPersisted = true
}
s.chunkDescsOffset = 0
return fp, true
}
// This series is supposed to be archived.
metric, err := p.getArchivedMetric(fp)
if err != nil {
glog.Errorf(
"Fingerprint %v assumed archived but couldn't be looked up in archived index: %s",
fp, err,
)
purge()
return fp, false
}
if metric == nil {
glog.Warningf(
"Fingerprint %v assumed archived but couldn't be found in archived index.",
fp,
)
purge()
return fp, false
}
// This series looks like a properly archived one.
return fp, true
}
func (p *persistence) cleanUpArchiveIndexes(
fpToSeries map[clientmodel.Fingerprint]*memorySeries,
fpsSeen map[clientmodel.Fingerprint]struct{},
) error {
glog.Info("Cleaning up archive indexes.")
var fp codable.Fingerprint
var m codable.Metric
count := 0
if err := p.archivedFingerprintToMetrics.ForEach(func(kv index.KeyValueAccessor) error {
count++
if count%10000 == 0 {
glog.Infof("%d archived metrics checked.", count)
}
if err := kv.Key(&fp); err != nil {
return err
}
_, fpSeen := fpsSeen[clientmodel.Fingerprint(fp)]
inMemory := false
if fpSeen {
_, inMemory = fpToSeries[clientmodel.Fingerprint(fp)]
}
if !fpSeen || inMemory {
if inMemory {
glog.Warningf("Archive clean-up: Fingerprint %v is not archived. Purging from archive indexes.", clientmodel.Fingerprint(fp))
}
if !fpSeen {
glog.Warningf("Archive clean-up: Fingerprint %v is unknown. Purging from archive indexes.", clientmodel.Fingerprint(fp))
}
if err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
return err
}
// Delete from timerange index, too.
p.archivedFingerprintToTimeRange.Delete(fp)
// TODO: Ignoring errors here as fp might not be in
// timerange index (which is good) but which would
// return an error. Delete signature could be changed
// like the Get signature to detect a real error.
return nil
}
// fp is legitimately archived. Make sure it is in timerange index, too.
has, err := p.archivedFingerprintToTimeRange.Has(fp)
if err != nil {
return err
}
if has {
return nil // All good.
}
glog.Warningf("Archive clean-up: Fingerprint %v is not in time-range index. Unarchiving it for recovery.")
if err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
return err
}
if err := kv.Value(&m); err != nil {
return err
}
series := newMemorySeries(clientmodel.Metric(m), false, math.MinInt64)
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
if err != nil {
return err
}
series.chunkDescs = cds
series.chunkDescsOffset = 0
fpToSeries[clientmodel.Fingerprint(fp)] = series
return nil
}); err != nil {
return err
}
count = 0
if err := p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error {
count++
if count%10000 == 0 {
glog.Infof("%d archived time ranges checked.", count)
}
if err := kv.Key(&fp); err != nil {
return err
}
has, err := p.archivedFingerprintToMetrics.Has(fp)
if err != nil {
return err
}
if has {
return nil // All good.
}
glog.Warningf("Archive clean-up: Purging unknown fingerprint %v in time-range index.", fp)
if err := p.archivedFingerprintToTimeRange.Delete(fp); err != nil {
return err
}
return nil
}); err != nil {
return err
}
glog.Info("Clean-up of archive indexes complete.")
return nil
}
func (p *persistence) rebuildLabelIndexes(
fpToSeries map[clientmodel.Fingerprint]*memorySeries,
) error {
count := 0
glog.Info("Rebuilding label indexes.")
glog.Info("Indexing metrics in memory.")
for fp, s := range fpToSeries {
p.indexMetric(fp, s.metric)
count++
if count%10000 == 0 {
glog.Infof("%d metrics queued for indexing.", count)
}
}
glog.Info("Indexing archived metrics.")
var fp codable.Fingerprint
var m codable.Metric
if err := p.archivedFingerprintToMetrics.ForEach(func(kv index.KeyValueAccessor) error {
if err := kv.Key(&fp); err != nil {
return err
}
if err := kv.Value(&m); err != nil {
return err
}
p.indexMetric(clientmodel.Fingerprint(fp), clientmodel.Metric(m))
count++
if count%10000 == 0 {
glog.Infof("%d metrics queued for indexing.", count)
}
return nil
}); err != nil {
return err
}
glog.Info("All requests for rebuilding the label indexes queued. (Actual processing may lag behind.)")
return nil
}
// getFingerprintsForLabelPair returns the fingerprints for the given label
// pair. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not yet made it into the index. (Same
@ -492,63 +884,100 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
}
// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
// open (non-full) head chunks. Only call this method during start-up while
// nothing else is running in storage land. This method is utterly
// goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
// open (non-full) head chunks. If recoverable corruption is detected, or if the
// dirty flag was set from the beginning, crash recovery is run, which might
// take a while. If an unrecoverable error is encountered, it is returned. Call
// this method during start-up while nothing else is running in storage
// land. This method is utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
var chunksTotal, chunkDescsTotal int64
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
sm = &seriesMap{m: fingerprintToSeries}
defer func() {
if sm != nil && p.dirty {
glog.Warning("Persistence layer appears dirty.")
err = p.crashRecovery(fingerprintToSeries)
if err != nil {
sm = nil
}
}
if err == nil {
atomic.AddInt64(&numMemChunks, chunksTotal)
atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal)
}
}()
f, err := os.Open(p.headsFileName())
if os.IsNotExist(err) {
return newSeriesMap(), nil
return sm, nil
}
if err != nil {
return nil, err
glog.Warning("Could not open heads file:", err)
p.dirty = true
return
}
defer f.Close()
r := bufio.NewReaderSize(f, fileBufSize)
buf := make([]byte, len(headsMagicString))
if _, err := io.ReadFull(r, buf); err != nil {
return nil, err
glog.Warning("Could not read from heads file:", err)
p.dirty = true
return sm, nil
}
magic := string(buf)
if magic != headsMagicString {
return nil, fmt.Errorf(
glog.Warningf(
"unexpected magic string, want %q, got %q",
headsMagicString, magic,
)
p.dirty = true
return
}
if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil {
return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion)
glog.Warningf("unknown heads format version, want %d", headsFormatVersion)
p.dirty = true
return sm, nil
}
numSeries, err := codable.DecodeUint64(r)
if err != nil {
return nil, err
glog.Warning("Could not decode number of series:", err)
p.dirty = true
return sm, nil
}
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries, numSeries)
for ; numSeries > 0; numSeries-- {
seriesFlags, err := r.ReadByte()
if err != nil {
return nil, err
glog.Warning("Could not read series flags:", err)
p.dirty = true
return sm, nil
}
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codable.DecodeUint64(r)
if err != nil {
return nil, err
glog.Warning("Could not decode fingerprint:", err)
p.dirty = true
return sm, nil
}
var metric codable.Metric
if err := metric.UnmarshalFromReader(r); err != nil {
return nil, err
glog.Warning("Could not decode metric:", err)
p.dirty = true
return sm, nil
}
chunkDescsOffset, err := binary.ReadVarint(r)
if err != nil {
return nil, err
glog.Warning("Could not decode chunk descriptor offset:", err)
p.dirty = true
return sm, nil
}
numChunkDescs, err := binary.ReadVarint(r)
if err != nil {
return nil, err
glog.Warning("Could not decode number of chunk descriptors:", err)
p.dirty = true
return sm, nil
}
chunkDescs := make([]*chunkDesc, numChunkDescs)
chunkDescsTotal += numChunkDescs
@ -557,11 +986,15 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
if headChunkPersisted || i < numChunkDescs-1 {
firstTime, err := binary.ReadVarint(r)
if err != nil {
return nil, err
glog.Warning("Could not decode first time:", err)
p.dirty = true
return sm, nil
}
lastTime, err := binary.ReadVarint(r)
if err != nil {
return nil, err
glog.Warning("Could not decode last time:", err)
p.dirty = true
return sm, nil
}
chunkDescs[i] = &chunkDesc{
chunkFirstTime: clientmodel.Timestamp(firstTime),
@ -572,11 +1005,15 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
chunksTotal++
chunkType, err := r.ReadByte()
if err != nil {
return nil, err
glog.Warning("Could not decode chunk type:", err)
p.dirty = true
return sm, nil
}
chunk := chunkForType(chunkType)
if err := chunk.unmarshal(r); err != nil {
return nil, err
glog.Warning("Could not decode chunk type:", err)
p.dirty = true
return sm, nil
}
chunkDescs[i] = newChunkDesc(chunk)
}
@ -589,9 +1026,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
headChunkPersisted: headChunkPersisted,
}
}
atomic.AddInt64(&numMemChunks, chunksTotal)
atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal)
return &seriesMap{m: fingerprintToSeries}, nil
return sm, nil
}
// dropChunks deletes all chunks from a series whose last sample time is before
@ -778,27 +1213,29 @@ func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) error {
}
// unarchiveMetric deletes an archived fingerprint and its metric, but (in
// contrast to dropArchivedMetric) does not un-index the metric. The method
// returns true if a metric was actually deleted. This method is goroutine-safe.
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (bool, error) {
// contrast to dropArchivedMetric) does not un-index the metric. If a metric
// was actually deleted, the method returns true and the first time of the
// deleted metric. This method is goroutine-safe.
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (bool, clientmodel.Timestamp, error) {
p.archiveMtx.Lock()
defer p.archiveMtx.Unlock()
has, err := p.archivedFingerprintToTimeRange.Has(fp)
firstTime, _, has, err := p.archivedFingerprintToTimeRange.Lookup(fp)
if err != nil || !has {
return false, err
return false, firstTime, err
}
if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil {
return false, err
return false, firstTime, err
}
if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil {
return false, err
return false, firstTime, err
}
return true, nil
return true, firstTime, nil
}
// close flushes the indexing queue and other buffered data and releases any
// held resources.
// held resources. It also removes the dirty marker file if successful and if
// the persistence is currently not marked as dirty.
func (p *persistence) close() error {
close(p.indexingQueue)
<-p.indexingStopped
@ -820,22 +1257,25 @@ func (p *persistence) close() error {
lastError = err
glog.Error("Error closing labelNameToLabelValues index DB: ", err)
}
if lastError == nil && !p.isDirty() {
lastError = os.Remove(p.dirtyFileName())
}
return lastError
}
func (p *persistence) dirNameForFingerprint(fp clientmodel.Fingerprint) string {
fpStr := fp.String()
return fmt.Sprintf("%s/%c%c", p.basePath, fpStr[0], fpStr[1])
return path.Join(p.basePath, fpStr[0:2])
}
func (p *persistence) fileNameForFingerprint(fp clientmodel.Fingerprint) string {
fpStr := fp.String()
return fmt.Sprintf("%s/%c%c/%s%s", p.basePath, fpStr[0], fpStr[1], fpStr[2:], seriesFileSuffix)
return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesFileSuffix)
}
func (p *persistence) tempFileNameForFingerprint(fp clientmodel.Fingerprint) string {
fpStr := fp.String()
return fmt.Sprintf("%s/%c%c/%s%s", p.basePath, fpStr[0], fpStr[1], fpStr[2:], seriesTempFileSuffix)
return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesTempFileSuffix)
}
func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) {

View file

@ -33,7 +33,7 @@ var (
func newTestPersistence(t *testing.T) (*persistence, test.Closer) {
dir := test.NewTemporaryDirectory("test_persistence", t)
p, err := newPersistence(dir.Path(), 1024)
p, err := newPersistence(dir.Path(), 1024, false)
if err != nil {
dir.Close()
t.Fatal(err)
@ -183,9 +183,9 @@ func TestCheckpointAndLoadSeriesMapAndHeads(t *testing.T) {
fpLocker := newFingerprintLocker(10)
sm := newSeriesMap()
s1 := newMemorySeries(m1, true)
s2 := newMemorySeries(m2, false)
s3 := newMemorySeries(m3, false)
s1 := newMemorySeries(m1, true, 0)
s2 := newMemorySeries(m2, false, 0)
s3 := newMemorySeries(m3, false, 0)
s1.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(m1.Fingerprint(), &metric.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkPersisted = true
@ -269,14 +269,17 @@ func TestGetFingerprintsModifiedBefore(t *testing.T) {
}
}
unarchived, err := p.unarchiveMetric(1)
unarchived, firstTime, err := p.unarchiveMetric(1)
if err != nil {
t.Fatal(err)
}
if !unarchived {
t.Fatal("expected actual unarchival")
}
unarchived, err = p.unarchiveMetric(1)
if firstTime != 2 {
t.Errorf("expected first time 2, got %v", firstTime)
}
unarchived, firstTime, err = p.unarchiveMetric(1)
if err != nil {
t.Fatal(err)
}
@ -535,13 +538,16 @@ func TestIndexing(t *testing.T) {
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p)
for fp, m := range b.fpToMetric {
p.unindexMetric(fp, m)
unarchived, err := p.unarchiveMetric(fp)
unarchived, firstTime, err := p.unarchiveMetric(fp)
if err != nil {
t.Fatal(err)
}
if !unarchived {
t.Errorf("%d. metric not unarchived", i)
}
if firstTime != 1 {
t.Errorf("%d. expected firstTime=1, got %v", i, firstTime)
}
delete(indexedFpsToMetrics, fp)
}
}

View file

@ -144,8 +144,13 @@ type memorySeries struct {
// special case: There are chunks on disk, but the offset to the
// chunkDescs in memory is unknown. Also, there is no overlap between
// chunks on disk and chunks in memory (implying that upon first
// persiting of a chunk in memory, the offset has to be set).
// persisting of a chunk in memory, the offset has to be set).
chunkDescsOffset int
// The savedFirstTime field is used as a fallback when the
// chunkDescsOffset is not 0. It can be used to save the firstTime of the
// first chunk before its chunk desc is evicted. In doubt, this field is
// just set to the oldest possible timestamp.
savedFirstTime clientmodel.Timestamp
// Whether the current head chunk has already been scheduled to be
// persisted. If true, the current head chunk must not be modified
// anymore.
@ -159,11 +164,17 @@ type memorySeries struct {
// newMemorySeries returns a pointer to a newly allocated memorySeries for the
// given metric. reallyNew defines if the memorySeries is a genuinely new series
// or (if false) a series for a metric being unarchived, i.e. a series that
// existed before but has been evicted from memory.
func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries {
// existed before but has been evicted from memory. If reallyNew is false,
// firstTime is ignored (and set to the lowest possible timestamp instead - it
// will be set properly upon the first eviction of chunkDescs).
func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries {
if reallyNew {
firstTime = math.MinInt64
}
s := memorySeries{
metric: m,
headChunkPersisted: !reallyNew,
savedFirstTime: firstTime,
}
if !reallyNew {
s.chunkDescsOffset = -1
@ -252,6 +263,7 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool,
if iOldestNotEvicted != -1 {
lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted)
if lenToKeep < len(s.chunkDescs) {
s.savedFirstTime = s.firstTime()
lenEvicted := len(s.chunkDescs) - lenToKeep
s.chunkDescsOffset += lenEvicted
chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted))
@ -470,7 +482,10 @@ func (s *memorySeries) values() metric.Values {
// firstTime returns the timestamp of the first sample in the series. The caller
// must have locked the fingerprint of the memorySeries.
func (s *memorySeries) firstTime() clientmodel.Timestamp {
return s.chunkDescs[0].firstTime()
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
return s.chunkDescs[0].firstTime()
}
return s.savedFirstTime
}
// lastTime returns the timestamp of the last sample in the series. The caller

View file

@ -15,7 +15,6 @@
package local
import (
"fmt"
"sync/atomic"
"time"
@ -27,7 +26,10 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
const persistQueueCap = 1024
const (
persistQueueCap = 1024
chunkLen = 1024
)
type storageState uint
@ -61,6 +63,7 @@ type memorySeriesStorage struct {
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter
purgeDuration, evictDuration prometheus.Gauge
}
@ -74,12 +77,13 @@ type MemorySeriesStorageOptions struct {
PersistencePurgeInterval time.Duration // How often to check for purging.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
Dirty bool // Force the storage to consider itself dirty on startup.
}
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
p, err := newPersistence(o.PersistenceStoragePath, 1024)
p, err := newPersistence(o.PersistenceStoragePath, chunkLen, o.Dirty)
if err != nil {
return nil, err
}
@ -150,6 +154,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "ingested_samples_total",
Help: "The total number of samples ingested.",
}),
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "invalid_preload_requests_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
}),
purgeDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -346,9 +356,10 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
series, ok := s.fpToSeries.get(fp)
if !ok {
unarchived, err := s.persistence.unarchiveMetric(fp)
unarchived, firstTime, err := s.persistence.unarchiveMetric(fp)
if err != nil {
glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
s.persistence.setDirty(true)
}
if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc()
@ -357,7 +368,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl
s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc()
}
series = newMemorySeries(m, !unarchived)
series = newMemorySeries(m, !unarchived, firstTime)
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
}
@ -389,7 +400,8 @@ func (s *memorySeriesStorage) preloadChunksForRange(
return nil, err
}
if !has {
return nil, fmt.Errorf("requested preload for non-existent series %v", fp)
s.invalidPreloadRequestsCount.Inc()
return nil, nil
}
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
metric, err := s.persistence.getArchivedMetric(fp)
@ -421,8 +433,7 @@ func (s *memorySeriesStorage) handlePersistQueue() {
if err != nil {
s.persistErrors.WithLabelValues(err.Error()).Inc()
glog.Error("Error persisting chunk: ", err)
glog.Error("The storage is now inconsistent. Prepare for disaster.")
// TODO: Remove respective chunkDesc to at least be consistent?
s.persistence.setDirty(true)
continue
}
req.chunkDesc.unpin()
@ -474,6 +485,7 @@ func (s *memorySeriesStorage) loop() {
m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", m.series.metric, err)
s.persistence.setDirty(true)
} else {
s.seriesOps.WithLabelValues(archive).Inc()
}
@ -535,6 +547,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
if err != nil {
glog.Error("Error purging persisted chunks: ", err)
s.persistence.setDirty(true)
}
// Purge chunks from memory accordingly.
@ -563,6 +576,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
if allDropped {
if err := s.persistence.dropArchivedMetric(fp); err != nil {
glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
s.persistence.setDirty(true)
} else {
s.seriesOps.WithLabelValues(archivePurge).Inc()
}
@ -591,6 +605,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc()
ch <- s.purgeDuration.Desc()
ch <- s.evictDuration.Desc()
@ -610,6 +625,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
ch <- s.numSeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount
ch <- s.purgeDuration
ch <- s.evictDuration