mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #601 from prometheus/beorn7/rename-persist-queue
Rename persist queue len/cap to num/max chunks to persist.
This commit is contained in:
commit
a3bd2f6eb8
26
main.go
26
main.go
|
@ -52,17 +52,17 @@ var (
|
||||||
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
|
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
|
||||||
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
|
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
|
||||||
|
|
||||||
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 0, "Deprecated. Has no effect anymore.")
|
|
||||||
|
|
||||||
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
|
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
|
||||||
|
|
||||||
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
|
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
|
||||||
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 1024*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.")
|
maxChunksToPersist = flag.Int("storage.local.max-chunks-to-persist", 1024*1024, "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.")
|
||||||
|
|
||||||
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.")
|
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.")
|
||||||
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
|
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
|
||||||
|
seriesSyncStrategy = flag.String("storage.local.series-sync-strategy", "adaptive", "When to sync series files after modification. Possible values: 'never', 'always', 'adaptive'. Sync'ing slows down storage performance but reduces the risk of data loss in case of an OS crash. With the 'adaptive' strategy, series files are sync'd for as long as the storage is not too much behind on chunk persistence.")
|
||||||
|
|
||||||
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
|
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
|
||||||
|
storagePedanticChecks = flag.Bool("storage.local.pedantic-checks", false, "If set, a crash recovery will perform checks on each series file. This might take a very long time.")
|
||||||
|
|
||||||
printVersion = flag.Bool("version", false, "Print version information.")
|
printVersion = flag.Bool("version", false, "Print version information.")
|
||||||
)
|
)
|
||||||
|
@ -89,14 +89,28 @@ func NewPrometheus() *prometheus {
|
||||||
|
|
||||||
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
|
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
|
||||||
|
|
||||||
|
var syncStrategy local.SyncStrategy
|
||||||
|
switch *seriesSyncStrategy {
|
||||||
|
case "never":
|
||||||
|
syncStrategy = local.Never
|
||||||
|
case "always":
|
||||||
|
syncStrategy = local.Always
|
||||||
|
case "adaptive":
|
||||||
|
syncStrategy = local.Adaptive
|
||||||
|
default:
|
||||||
|
glog.Fatalf("Invalid flag value for 'storage.local.series-sync-strategy': %s", *seriesSyncStrategy)
|
||||||
|
}
|
||||||
|
|
||||||
o := &local.MemorySeriesStorageOptions{
|
o := &local.MemorySeriesStorageOptions{
|
||||||
MemoryChunks: *numMemoryChunks,
|
MemoryChunks: *numMemoryChunks,
|
||||||
|
MaxChunksToPersist: *maxChunksToPersist,
|
||||||
PersistenceStoragePath: *persistenceStoragePath,
|
PersistenceStoragePath: *persistenceStoragePath,
|
||||||
PersistenceRetentionPeriod: *persistenceRetentionPeriod,
|
PersistenceRetentionPeriod: *persistenceRetentionPeriod,
|
||||||
PersistenceQueueCapacity: *persistenceQueueCapacity,
|
|
||||||
CheckpointInterval: *checkpointInterval,
|
CheckpointInterval: *checkpointInterval,
|
||||||
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
|
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
|
||||||
Dirty: *storageDirty,
|
Dirty: *storageDirty,
|
||||||
|
PedanticChecks: *storagePedanticChecks,
|
||||||
|
SyncStrategy: syncStrategy,
|
||||||
}
|
}
|
||||||
memStorage, err := local.NewMemorySeriesStorage(o)
|
memStorage, err := local.NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -193,6 +193,7 @@ func (p *persistence) sanitizeSeries(
|
||||||
|
|
||||||
bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen)
|
bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen)
|
||||||
chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen)
|
chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen)
|
||||||
|
modTime := fi.ModTime()
|
||||||
if bytesToTrim != 0 {
|
if bytesToTrim != 0 {
|
||||||
glog.Warningf(
|
glog.Warningf(
|
||||||
"Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.",
|
"Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.",
|
||||||
|
@ -221,7 +222,11 @@ func (p *persistence) sanitizeSeries(
|
||||||
if s == nil {
|
if s == nil {
|
||||||
panic("fingerprint mapped to nil pointer")
|
panic("fingerprint mapped to nil pointer")
|
||||||
}
|
}
|
||||||
if bytesToTrim == 0 && s.chunkDescsOffset != -1 && chunksInFile == s.chunkDescsOffset+s.persistWatermark {
|
if !p.pedanticChecks &&
|
||||||
|
bytesToTrim == 0 &&
|
||||||
|
s.chunkDescsOffset != -1 &&
|
||||||
|
chunksInFile == s.chunkDescsOffset+s.persistWatermark &&
|
||||||
|
modTime.Equal(s.modTime) {
|
||||||
// Everything is consistent. We are good.
|
// Everything is consistent. We are good.
|
||||||
return fp, true
|
return fp, true
|
||||||
}
|
}
|
||||||
|
@ -238,8 +243,9 @@ func (p *persistence) sanitizeSeries(
|
||||||
s.metric, fp, chunksInFile,
|
s.metric, fp, chunksInFile,
|
||||||
)
|
)
|
||||||
s.chunkDescs = nil
|
s.chunkDescs = nil
|
||||||
s.chunkDescsOffset = -1
|
s.chunkDescsOffset = chunksInFile
|
||||||
s.persistWatermark = 0
|
s.persistWatermark = 0
|
||||||
|
s.modTime = modTime
|
||||||
return fp, true
|
return fp, true
|
||||||
}
|
}
|
||||||
// This is the tricky one: We have chunks from heads.db, but
|
// This is the tricky one: We have chunks from heads.db, but
|
||||||
|
@ -265,6 +271,7 @@ func (p *persistence) sanitizeSeries(
|
||||||
}
|
}
|
||||||
s.persistWatermark = len(cds)
|
s.persistWatermark = len(cds)
|
||||||
s.chunkDescsOffset = 0
|
s.chunkDescsOffset = 0
|
||||||
|
s.modTime = modTime
|
||||||
|
|
||||||
lastTime := cds[len(cds)-1].lastTime()
|
lastTime := cds[len(cds)-1].lastTime()
|
||||||
keepIdx := -1
|
keepIdx := -1
|
||||||
|
|
|
@ -72,6 +72,12 @@ const (
|
||||||
// Op-types for chunkOps and chunkDescOps.
|
// Op-types for chunkOps and chunkDescOps.
|
||||||
evict = "evict"
|
evict = "evict"
|
||||||
load = "load"
|
load = "load"
|
||||||
|
|
||||||
|
seriesLocationLabel = "location"
|
||||||
|
|
||||||
|
// Maintenance types for maintainSeriesDuration.
|
||||||
|
maintainInMemory = "memory"
|
||||||
|
maintainArchived = "archived"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
@ -111,18 +111,21 @@ type persistence struct {
|
||||||
indexingQueueLength prometheus.Gauge
|
indexingQueueLength prometheus.Gauge
|
||||||
indexingQueueCapacity prometheus.Metric
|
indexingQueueCapacity prometheus.Metric
|
||||||
indexingBatchSizes prometheus.Summary
|
indexingBatchSizes prometheus.Summary
|
||||||
indexingBatchLatency prometheus.Summary
|
indexingBatchDuration prometheus.Summary
|
||||||
checkpointDuration prometheus.Gauge
|
checkpointDuration prometheus.Gauge
|
||||||
|
|
||||||
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
|
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
|
||||||
dirty bool // true if persistence was started in dirty state.
|
dirty bool // true if persistence was started in dirty state.
|
||||||
becameDirty bool // true if an inconsistency came up during runtime.
|
becameDirty bool // true if an inconsistency came up during runtime.
|
||||||
dirtyFileName string // The file used for locking and to mark dirty state.
|
pedanticChecks bool // true if crash recovery should check each series.
|
||||||
fLock flock.Releaser // The file lock to protect against concurrent usage.
|
dirtyFileName string // The file used for locking and to mark dirty state.
|
||||||
|
fLock flock.Releaser // The file lock to protect against concurrent usage.
|
||||||
|
|
||||||
|
shouldSync syncStrategy
|
||||||
}
|
}
|
||||||
|
|
||||||
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
|
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
|
||||||
func newPersistence(basePath string, dirty bool) (*persistence, error) {
|
func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync syncStrategy) (*persistence, error) {
|
||||||
dirtyPath := filepath.Join(basePath, dirtyFileName)
|
dirtyPath := filepath.Join(basePath, dirtyFileName)
|
||||||
versionPath := filepath.Join(basePath, versionFileName)
|
versionPath := filepath.Join(basePath, versionFileName)
|
||||||
|
|
||||||
|
@ -211,12 +214,12 @@ func newPersistence(basePath string, dirty bool) (*persistence, error) {
|
||||||
Help: "Quantiles for indexing batch sizes (number of metrics per batch).",
|
Help: "Quantiles for indexing batch sizes (number of metrics per batch).",
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
indexingBatchLatency: prometheus.NewSummary(
|
indexingBatchDuration: prometheus.NewSummary(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "indexing_batch_latency_milliseconds",
|
Name: "indexing_batch_duration_milliseconds",
|
||||||
Help: "Quantiles for batch indexing latencies in milliseconds.",
|
Help: "Quantiles for batch indexing duration in milliseconds.",
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
|
checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
@ -225,9 +228,11 @@ func newPersistence(basePath string, dirty bool) (*persistence, error) {
|
||||||
Name: "checkpoint_duration_milliseconds",
|
Name: "checkpoint_duration_milliseconds",
|
||||||
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
|
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
|
||||||
}),
|
}),
|
||||||
dirty: dirty,
|
dirty: dirty,
|
||||||
dirtyFileName: dirtyPath,
|
pedanticChecks: pedanticChecks,
|
||||||
fLock: fLock,
|
dirtyFileName: dirtyPath,
|
||||||
|
fLock: fLock,
|
||||||
|
shouldSync: shouldSync,
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.dirty {
|
if p.dirty {
|
||||||
|
@ -259,7 +264,7 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
|
||||||
ch <- p.indexingQueueLength.Desc()
|
ch <- p.indexingQueueLength.Desc()
|
||||||
ch <- p.indexingQueueCapacity.Desc()
|
ch <- p.indexingQueueCapacity.Desc()
|
||||||
p.indexingBatchSizes.Describe(ch)
|
p.indexingBatchSizes.Describe(ch)
|
||||||
p.indexingBatchLatency.Describe(ch)
|
p.indexingBatchDuration.Describe(ch)
|
||||||
ch <- p.checkpointDuration.Desc()
|
ch <- p.checkpointDuration.Desc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,7 +275,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
|
||||||
ch <- p.indexingQueueLength
|
ch <- p.indexingQueueLength
|
||||||
ch <- p.indexingQueueCapacity
|
ch <- p.indexingQueueCapacity
|
||||||
p.indexingBatchSizes.Collect(ch)
|
p.indexingBatchSizes.Collect(ch)
|
||||||
p.indexingBatchLatency.Collect(ch)
|
p.indexingBatchDuration.Collect(ch)
|
||||||
ch <- p.checkpointDuration
|
ch <- p.checkpointDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,7 +345,7 @@ func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer p.closeChunkFile(f)
|
||||||
|
|
||||||
if err := writeChunks(f, chunks); err != nil {
|
if err := writeChunks(f, chunks); err != nil {
|
||||||
return -1, err
|
return -1, err
|
||||||
|
@ -477,7 +482,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
|
||||||
//
|
//
|
||||||
// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
|
// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
|
||||||
//
|
//
|
||||||
// (4.5) The varint-encoded chunkDescsOffset.
|
// (4.5) The modification time of the series file as nanoseconds elapsed since
|
||||||
|
// January 1, 1970 UTC. -1 if the modification time is unknown or no series file
|
||||||
|
// exists yet. (Missing in v1.)
|
||||||
|
//
|
||||||
|
// (4.6) The varint-encoded chunkDescsOffset.
|
||||||
//
|
//
|
||||||
// (4.6) The varint-encoded savedFirstTime.
|
// (4.6) The varint-encoded savedFirstTime.
|
||||||
//
|
//
|
||||||
|
@ -569,6 +578,15 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
|
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if m.series.modTime.IsZero() {
|
||||||
|
if _, err = codable.EncodeVarint(w, -1); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
|
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -627,7 +645,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
// unrecoverable error is encountered, it is returned. Call this method during
|
// unrecoverable error is encountered, it is returned. Call this method during
|
||||||
// start-up while nothing else is running in storage land. This method is
|
// start-up while nothing else is running in storage land. This method is
|
||||||
// utterly goroutine-unsafe.
|
// utterly goroutine-unsafe.
|
||||||
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen int64, err error) {
|
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
|
||||||
var chunkDescsTotal int64
|
var chunkDescsTotal int64
|
||||||
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
|
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
|
||||||
sm = &seriesMap{m: fingerprintToSeries}
|
sm = &seriesMap{m: fingerprintToSeries}
|
||||||
|
@ -690,48 +708,58 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not read series flags:", err)
|
glog.Warning("Could not read series flags:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
|
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
|
||||||
fp, err := codable.DecodeUint64(r)
|
fp, err := codable.DecodeUint64(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode fingerprint:", err)
|
glog.Warning("Could not decode fingerprint:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
var metric codable.Metric
|
var metric codable.Metric
|
||||||
if err := metric.UnmarshalFromReader(r); err != nil {
|
if err := metric.UnmarshalFromReader(r); err != nil {
|
||||||
glog.Warning("Could not decode metric:", err)
|
glog.Warning("Could not decode metric:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
var persistWatermark int64
|
var persistWatermark int64
|
||||||
|
var modTime time.Time
|
||||||
if version != headsFormatLegacyVersion {
|
if version != headsFormatLegacyVersion {
|
||||||
// persistWatermark only present in v2.
|
// persistWatermark only present in v2.
|
||||||
persistWatermark, err = binary.ReadVarint(r)
|
persistWatermark, err = binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode persist watermark:", err)
|
glog.Warning("Could not decode persist watermark:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
|
}
|
||||||
|
modTimeNano, err := binary.ReadVarint(r)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warning("Could not decode modification time:", err)
|
||||||
|
p.dirty = true
|
||||||
|
return sm, chunksToPersist, nil
|
||||||
|
}
|
||||||
|
if modTimeNano != -1 {
|
||||||
|
modTime = time.Unix(0, modTimeNano)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
chunkDescsOffset, err := binary.ReadVarint(r)
|
chunkDescsOffset, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode chunk descriptor offset:", err)
|
glog.Warning("Could not decode chunk descriptor offset:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
savedFirstTime, err := binary.ReadVarint(r)
|
savedFirstTime, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode saved first time:", err)
|
glog.Warning("Could not decode saved first time:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
numChunkDescs, err := binary.ReadVarint(r)
|
numChunkDescs, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode number of chunk descriptors:", err)
|
glog.Warning("Could not decode number of chunk descriptors:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunkDescs := make([]*chunkDesc, numChunkDescs)
|
chunkDescs := make([]*chunkDesc, numChunkDescs)
|
||||||
if version == headsFormatLegacyVersion {
|
if version == headsFormatLegacyVersion {
|
||||||
|
@ -748,13 +776,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode first time:", err)
|
glog.Warning("Could not decode first time:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
lastTime, err := binary.ReadVarint(r)
|
lastTime, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode last time:", err)
|
glog.Warning("Could not decode last time:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunkDescs[i] = &chunkDesc{
|
chunkDescs[i] = &chunkDesc{
|
||||||
chunkFirstTime: clientmodel.Timestamp(firstTime),
|
chunkFirstTime: clientmodel.Timestamp(firstTime),
|
||||||
|
@ -767,16 +795,16 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode chunk type:", err)
|
glog.Warning("Could not decode chunk type:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunk := newChunkForEncoding(chunkEncoding(encoding))
|
chunk := newChunkForEncoding(chunkEncoding(encoding))
|
||||||
if err := chunk.unmarshal(r); err != nil {
|
if err := chunk.unmarshal(r); err != nil {
|
||||||
glog.Warning("Could not decode chunk type:", err)
|
glog.Warning("Could not decode chunk type:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunkDescs[i] = newChunkDesc(chunk)
|
chunkDescs[i] = newChunkDesc(chunk)
|
||||||
persistQueueLen++
|
chunksToPersist++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -784,12 +812,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
metric: clientmodel.Metric(metric),
|
metric: clientmodel.Metric(metric),
|
||||||
chunkDescs: chunkDescs,
|
chunkDescs: chunkDescs,
|
||||||
persistWatermark: int(persistWatermark),
|
persistWatermark: int(persistWatermark),
|
||||||
|
modTime: modTime,
|
||||||
chunkDescsOffset: int(chunkDescsOffset),
|
chunkDescsOffset: int(chunkDescsOffset),
|
||||||
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
|
savedFirstTime: clientmodel.Timestamp(savedFirstTime),
|
||||||
headChunkClosed: persistWatermark >= numChunkDescs,
|
headChunkClosed: persistWatermark >= numChunkDescs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// dropAndPersistChunks deletes all chunks from a series file whose last sample
|
// dropAndPersistChunks deletes all chunks from a series file whose last sample
|
||||||
|
@ -921,7 +950,7 @@ func (p *persistence) dropAndPersistChunks(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
temp.Close()
|
p.closeChunkFile(temp)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
|
err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
|
||||||
}
|
}
|
||||||
|
@ -962,6 +991,17 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error)
|
||||||
return numChunks, nil
|
return numChunks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getSeriesFileModTime returns the modification time of the series file
|
||||||
|
// belonging to the provided fingerprint. In case of an error, the zero value of
|
||||||
|
// time.Time is returned.
|
||||||
|
func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time {
|
||||||
|
var modTime time.Time
|
||||||
|
if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil {
|
||||||
|
return fi.ModTime()
|
||||||
|
}
|
||||||
|
return modTime
|
||||||
|
}
|
||||||
|
|
||||||
// indexMetric queues the given metric for addition to the indexes needed by
|
// indexMetric queues the given metric for addition to the indexes needed by
|
||||||
// getFingerprintsForLabelPair, getLabelValuesForLabelName, and
|
// getFingerprintsForLabelPair, getLabelValuesForLabelName, and
|
||||||
// getFingerprintsModifiedBefore. If the queue is full, this method blocks
|
// getFingerprintsModifiedBefore. If the queue is full, this method blocks
|
||||||
|
@ -1195,6 +1235,19 @@ func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.F
|
||||||
// would still be detected.
|
// would still be detected.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// closeChunkFile first syncs the provided file if mandated so by the sync
|
||||||
|
// strategy. Then it closes the file. Errors are logged.
|
||||||
|
func (p *persistence) closeChunkFile(f *os.File) {
|
||||||
|
if p.shouldSync() {
|
||||||
|
if err := f.Sync(); err != nil {
|
||||||
|
glog.Error("Error syncing file:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
glog.Error("Error closing chunk file:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) {
|
func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) {
|
||||||
return os.Open(p.fileNameForFingerprint(fp))
|
return os.Open(p.fileNameForFingerprint(fp))
|
||||||
}
|
}
|
||||||
|
@ -1217,7 +1270,9 @@ func (p *persistence) processIndexingQueue() {
|
||||||
commitBatch := func() {
|
commitBatch := func() {
|
||||||
p.indexingBatchSizes.Observe(float64(batchSize))
|
p.indexingBatchSizes.Observe(float64(batchSize))
|
||||||
defer func(begin time.Time) {
|
defer func(begin time.Time) {
|
||||||
p.indexingBatchLatency.Observe(float64(time.Since(begin) / time.Millisecond))
|
p.indexingBatchDuration.Observe(
|
||||||
|
float64(time.Since(begin)) / float64(time.Millisecond),
|
||||||
|
)
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {
|
if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {
|
||||||
|
|
|
@ -36,7 +36,7 @@ var (
|
||||||
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) {
|
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, test.Closer) {
|
||||||
*defaultChunkEncoding = int(encoding)
|
*defaultChunkEncoding = int(encoding)
|
||||||
dir := test.NewTemporaryDirectory("test_persistence", t)
|
dir := test.NewTemporaryDirectory("test_persistence", t)
|
||||||
p, err := newPersistence(dir.Path(), false)
|
p, err := newPersistence(dir.Path(), false, false, func() bool { return false })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
dir.Close()
|
dir.Close()
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -143,6 +143,9 @@ type memorySeries struct {
|
||||||
// points to a non-persisted chunk. If all chunks are persisted, then
|
// points to a non-persisted chunk. If all chunks are persisted, then
|
||||||
// persistWatermark == len(chunkDescs).
|
// persistWatermark == len(chunkDescs).
|
||||||
persistWatermark int
|
persistWatermark int
|
||||||
|
// The modification time of the series file. The zero value of time.Time
|
||||||
|
// is used to mark an unknown modification time.
|
||||||
|
modTime time.Time
|
||||||
// The chunkDescs in memory might not have all the chunkDescs for the
|
// The chunkDescs in memory might not have all the chunkDescs for the
|
||||||
// chunks that are persisted to disk. The missing chunkDescs are all
|
// chunks that are persisted to disk. The missing chunkDescs are all
|
||||||
// contiguous and at the tail end. chunkDescsOffset is the index of the
|
// contiguous and at the tail end. chunkDescsOffset is the index of the
|
||||||
|
|
|
@ -36,12 +36,23 @@ const (
|
||||||
fpMaxSweepTime = 6 * time.Hour
|
fpMaxSweepTime = 6 * time.Hour
|
||||||
|
|
||||||
maxEvictInterval = time.Minute
|
maxEvictInterval = time.Minute
|
||||||
|
|
||||||
|
// If numChunskToPersist is this percentage of maxChunksToPersist, we
|
||||||
|
// consider the storage in "graceful degradation mode", i.e. we do not
|
||||||
|
// checkpoint anymore based on the dirty series count, and we do not
|
||||||
|
// sync series files anymore if using the adaptive sync strategy.
|
||||||
|
percentChunksToPersistForDegradation = 80
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
persistQueueLengthDesc = prometheus.NewDesc(
|
numChunksToPersistDesc = prometheus.NewDesc(
|
||||||
prometheus.BuildFQName(namespace, subsystem, "persist_queue_length"),
|
prometheus.BuildFQName(namespace, subsystem, "chunks_to_persist"),
|
||||||
"The current number of chunks waiting in the persist queue.",
|
"The current number of chunks waiting for persistence.",
|
||||||
|
nil, nil,
|
||||||
|
)
|
||||||
|
maxChunksToPersistDesc = prometheus.NewDesc(
|
||||||
|
prometheus.BuildFQName(namespace, subsystem, "max_chunks_to_persist"),
|
||||||
|
"The maximum number of chunks that can be waiting for persistence before sample ingestion will stop.",
|
||||||
nil, nil,
|
nil, nil,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -51,6 +62,21 @@ type evictRequest struct {
|
||||||
evict bool
|
evict bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncStrategy is an enum to select a sync strategy for series files.
|
||||||
|
type SyncStrategy int
|
||||||
|
|
||||||
|
// Possible values for SyncStrategy.
|
||||||
|
const (
|
||||||
|
_ SyncStrategy = iota
|
||||||
|
Never
|
||||||
|
Always
|
||||||
|
Adaptive
|
||||||
|
)
|
||||||
|
|
||||||
|
// A syncStrategy is a function that returns whether series files should be
|
||||||
|
// synced or not. It does not need to be goroutine safe.
|
||||||
|
type syncStrategy func() bool
|
||||||
|
|
||||||
type memorySeriesStorage struct {
|
type memorySeriesStorage struct {
|
||||||
fpLocker *fingerprintLocker
|
fpLocker *fingerprintLocker
|
||||||
fpToSeries *seriesMap
|
fpToSeries *seriesMap
|
||||||
|
@ -61,25 +87,22 @@ type memorySeriesStorage struct {
|
||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
checkpointDirtySeriesLimit int
|
checkpointDirtySeriesLimit int
|
||||||
|
|
||||||
persistQueueLen int64 // The number of chunks that need persistence.
|
numChunksToPersist int64 // The number of chunks waiting for persistence.
|
||||||
persistQueueCap int // If persistQueueLen reaches this threshold, ingestion will stall.
|
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
|
||||||
// Note that internally, the chunks to persist are not organized in a queue-like data structure,
|
degraded bool
|
||||||
// but handled in a more sophisticated way (see maintainMemorySeries).
|
|
||||||
|
|
||||||
persistence *persistence
|
persistence *persistence
|
||||||
|
|
||||||
countPersistedHeadChunks chan struct{}
|
|
||||||
|
|
||||||
evictList *list.List
|
evictList *list.List
|
||||||
evictRequests chan evictRequest
|
evictRequests chan evictRequest
|
||||||
evictStopping, evictStopped chan struct{}
|
evictStopping, evictStopped chan struct{}
|
||||||
|
|
||||||
persistErrors prometheus.Counter
|
persistErrors prometheus.Counter
|
||||||
persistQueueCapacity prometheus.Metric
|
|
||||||
numSeries prometheus.Gauge
|
numSeries prometheus.Gauge
|
||||||
seriesOps *prometheus.CounterVec
|
seriesOps *prometheus.CounterVec
|
||||||
ingestedSamplesCount prometheus.Counter
|
ingestedSamplesCount prometheus.Counter
|
||||||
invalidPreloadRequestsCount prometheus.Counter
|
invalidPreloadRequestsCount prometheus.Counter
|
||||||
|
maintainSeriesDuration *prometheus.SummaryVec
|
||||||
}
|
}
|
||||||
|
|
||||||
// MemorySeriesStorageOptions contains options needed by
|
// MemorySeriesStorageOptions contains options needed by
|
||||||
|
@ -87,38 +110,21 @@ type memorySeriesStorage struct {
|
||||||
// values.
|
// values.
|
||||||
type MemorySeriesStorageOptions struct {
|
type MemorySeriesStorageOptions struct {
|
||||||
MemoryChunks int // How many chunks to keep in memory.
|
MemoryChunks int // How many chunks to keep in memory.
|
||||||
|
MaxChunksToPersist int // Max number of chunks waiting to be persisted.
|
||||||
PersistenceStoragePath string // Location of persistence files.
|
PersistenceStoragePath string // Location of persistence files.
|
||||||
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
|
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
|
||||||
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
|
|
||||||
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
||||||
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
||||||
Dirty bool // Force the storage to consider itself dirty on startup.
|
Dirty bool // Force the storage to consider itself dirty on startup.
|
||||||
|
PedanticChecks bool // If dirty, perform crash-recovery checks on each series file.
|
||||||
|
SyncStrategy SyncStrategy // Which sync strategy to apply to series files.
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
|
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
|
||||||
// has to be called to start the storage.
|
// has to be called to start the storage.
|
||||||
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
glog.Info("Loading series map and head chunks...")
|
|
||||||
fpToSeries, persistQueueLen, err := p.loadSeriesMapAndHeads()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
glog.Infof("%d series loaded.", fpToSeries.length())
|
|
||||||
numSeries := prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: subsystem,
|
|
||||||
Name: "memory_series",
|
|
||||||
Help: "The current number of series in memory.",
|
|
||||||
})
|
|
||||||
numSeries.Set(float64(fpToSeries.length()))
|
|
||||||
|
|
||||||
s := &memorySeriesStorage{
|
s := &memorySeriesStorage{
|
||||||
fpLocker: newFingerprintLocker(1024),
|
fpLocker: newFingerprintLocker(1024),
|
||||||
fpToSeries: fpToSeries,
|
|
||||||
|
|
||||||
loopStopping: make(chan struct{}),
|
loopStopping: make(chan struct{}),
|
||||||
loopStopped: make(chan struct{}),
|
loopStopped: make(chan struct{}),
|
||||||
|
@ -127,11 +133,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
checkpointInterval: o.CheckpointInterval,
|
checkpointInterval: o.CheckpointInterval,
|
||||||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||||
|
|
||||||
persistQueueLen: persistQueueLen,
|
maxChunksToPersist: o.MaxChunksToPersist,
|
||||||
persistQueueCap: o.PersistenceQueueCapacity,
|
|
||||||
persistence: p,
|
|
||||||
|
|
||||||
countPersistedHeadChunks: make(chan struct{}, 100),
|
|
||||||
|
|
||||||
evictList: list.New(),
|
evictList: list.New(),
|
||||||
evictRequests: make(chan evictRequest, evictRequestsCap),
|
evictRequests: make(chan evictRequest, evictRequestsCap),
|
||||||
|
@ -144,15 +146,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
Name: "persist_errors_total",
|
Name: "persist_errors_total",
|
||||||
Help: "The total number of errors while persisting chunks.",
|
Help: "The total number of errors while persisting chunks.",
|
||||||
}),
|
}),
|
||||||
persistQueueCapacity: prometheus.MustNewConstMetric(
|
numSeries: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
prometheus.NewDesc(
|
Namespace: namespace,
|
||||||
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
|
Subsystem: subsystem,
|
||||||
"The total capacity of the persist queue.",
|
Name: "memory_series",
|
||||||
nil, nil,
|
Help: "The current number of series in memory.",
|
||||||
),
|
}),
|
||||||
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
|
|
||||||
),
|
|
||||||
numSeries: numSeries,
|
|
||||||
seriesOps: prometheus.NewCounterVec(
|
seriesOps: prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
|
@ -174,8 +173,43 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
Name: "invalid_preload_requests_total",
|
Name: "invalid_preload_requests_total",
|
||||||
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
|
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
|
||||||
}),
|
}),
|
||||||
|
maintainSeriesDuration: prometheus.NewSummaryVec(
|
||||||
|
prometheus.SummaryOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "maintain_series_duration_milliseconds",
|
||||||
|
Help: "The duration (in milliseconds) it took to perform maintenance on a series.",
|
||||||
|
},
|
||||||
|
[]string{seriesLocationLabel},
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var syncStrategy syncStrategy
|
||||||
|
switch o.SyncStrategy {
|
||||||
|
case Never:
|
||||||
|
syncStrategy = func() bool { return false }
|
||||||
|
case Always:
|
||||||
|
syncStrategy = func() bool { return true }
|
||||||
|
case Adaptive:
|
||||||
|
syncStrategy = func() bool { return !s.isDegraded() }
|
||||||
|
default:
|
||||||
|
panic("unknown sync strategy")
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := newPersistence(o.PersistenceStoragePath, o.Dirty, o.PedanticChecks, syncStrategy)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.persistence = p
|
||||||
|
|
||||||
|
glog.Info("Loading series map and head chunks...")
|
||||||
|
s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
glog.Infof("%d series loaded.", s.fpToSeries.length())
|
||||||
|
s.numSeries.Set(float64(s.fpToSeries.length()))
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,12 +371,12 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
|
||||||
|
|
||||||
// Append implements Storage.
|
// Append implements Storage.
|
||||||
func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
||||||
if s.getPersistQueueLen() >= s.persistQueueCap {
|
if s.getNumChunksToPersist() >= s.maxChunksToPersist {
|
||||||
glog.Warningf(
|
glog.Warningf(
|
||||||
"%d chunks waiting for persistence, sample ingestion suspended.",
|
"%d chunks waiting for persistence, sample ingestion suspended.",
|
||||||
s.getPersistQueueLen(),
|
s.getNumChunksToPersist(),
|
||||||
)
|
)
|
||||||
for s.getPersistQueueLen() >= s.persistQueueCap {
|
for s.getNumChunksToPersist() >= s.maxChunksToPersist {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
glog.Warning("Sample ingestion resumed.")
|
glog.Warning("Sample ingestion resumed.")
|
||||||
|
@ -356,7 +390,7 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
||||||
})
|
})
|
||||||
s.fpLocker.Unlock(fp)
|
s.fpLocker.Unlock(fp)
|
||||||
s.ingestedSamplesCount.Inc()
|
s.ingestedSamplesCount.Inc()
|
||||||
s.incPersistQueueLen(completedChunksCount)
|
s.incNumChunksToPersist(completedChunksCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
||||||
|
@ -635,20 +669,14 @@ loop:
|
||||||
case fp := <-memoryFingerprints:
|
case fp := <-memoryFingerprints:
|
||||||
if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) {
|
if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) {
|
||||||
dirtySeriesCount++
|
dirtySeriesCount++
|
||||||
// Check if we have enough "dirty" series so
|
// Check if we have enough "dirty" series so that we need an early checkpoint.
|
||||||
// that we need an early checkpoint. However,
|
// However, if we are already behind persisting chunks, creating a checkpoint
|
||||||
// if we are already at 90% capacity of the
|
// would be counterproductive, as it would slow down chunk persisting even more,
|
||||||
// persist queue, creating a checkpoint would be
|
// while in a situation like that, where we are clearly lacking speed of disk
|
||||||
// counterproductive, as it would slow down
|
// maintenance, the best we can do for crash recovery is to persist chunks as
|
||||||
// chunk persisting even more, while in a
|
// quickly as possible. So only checkpoint if the storage is not in "graceful
|
||||||
// situation like that, where we are clearly
|
// degratadion mode".
|
||||||
// lacking speed of disk maintenance, the best
|
if dirtySeriesCount >= s.checkpointDirtySeriesLimit && !s.isDegraded() {
|
||||||
// we can do for crash recovery is to work
|
|
||||||
// through the persist queue as quickly as
|
|
||||||
// possible. So only checkpoint if the persist
|
|
||||||
// queue is at most 90% full.
|
|
||||||
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
|
|
||||||
s.getPersistQueueLen() < s.persistQueueCap*9/10 {
|
|
||||||
checkpointTimer.Reset(0)
|
checkpointTimer.Reset(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -697,6 +725,12 @@ loop:
|
||||||
func (s *memorySeriesStorage) maintainMemorySeries(
|
func (s *memorySeriesStorage) maintainMemorySeries(
|
||||||
fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp,
|
fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp,
|
||||||
) (becameDirty bool) {
|
) (becameDirty bool) {
|
||||||
|
defer func(begin time.Time) {
|
||||||
|
s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe(
|
||||||
|
float64(time.Since(begin)) / float64(time.Millisecond),
|
||||||
|
)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
|
@ -709,7 +743,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(
|
||||||
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
||||||
|
|
||||||
if series.maybeCloseHeadChunk() {
|
if series.maybeCloseHeadChunk() {
|
||||||
s.incPersistQueueLen(1)
|
s.incNumChunksToPersist(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
seriesWasDirty := series.dirty
|
seriesWasDirty := series.dirty
|
||||||
|
@ -777,8 +811,9 @@ func (s *memorySeriesStorage) writeMemorySeries(
|
||||||
for _, cd := range cds {
|
for _, cd := range cds {
|
||||||
cd.unpin(s.evictRequests)
|
cd.unpin(s.evictRequests)
|
||||||
}
|
}
|
||||||
s.incPersistQueueLen(-len(cds))
|
s.incNumChunksToPersist(-len(cds))
|
||||||
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
|
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
|
||||||
|
series.modTime = s.persistence.getSeriesFileModTime(fp)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the actual chunks from underneath the chunkDescs.
|
// Get the actual chunks from underneath the chunkDescs.
|
||||||
|
@ -815,7 +850,8 @@ func (s *memorySeriesStorage) writeMemorySeries(
|
||||||
series.dropChunks(beforeTime)
|
series.dropChunks(beforeTime)
|
||||||
if len(series.chunkDescs) == 0 { // All chunks dropped from memory series.
|
if len(series.chunkDescs) == 0 { // All chunks dropped from memory series.
|
||||||
if !allDroppedFromPersistence {
|
if !allDroppedFromPersistence {
|
||||||
panic("all chunks dropped from memory but chunks left in persistence")
|
glog.Errorf("All chunks dropped from memory but chunks left in persistence for fingerprint %v, series %v.", fp, series)
|
||||||
|
s.persistence.setDirty(true)
|
||||||
}
|
}
|
||||||
s.fpToSeries.del(fp)
|
s.fpToSeries.del(fp)
|
||||||
s.numSeries.Dec()
|
s.numSeries.Dec()
|
||||||
|
@ -829,7 +865,9 @@ func (s *memorySeriesStorage) writeMemorySeries(
|
||||||
} else {
|
} else {
|
||||||
series.chunkDescsOffset -= numDroppedFromPersistence
|
series.chunkDescsOffset -= numDroppedFromPersistence
|
||||||
if series.chunkDescsOffset < 0 {
|
if series.chunkDescsOffset < 0 {
|
||||||
panic("dropped more chunks from persistence than from memory")
|
glog.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series)
|
||||||
|
s.persistence.setDirty(true)
|
||||||
|
series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -838,6 +876,12 @@ func (s *memorySeriesStorage) writeMemorySeries(
|
||||||
// maintainArchivedSeries drops chunks older than beforeTime from an archived
|
// maintainArchivedSeries drops chunks older than beforeTime from an archived
|
||||||
// series. If the series contains no chunks after that, it is purged entirely.
|
// series. If the series contains no chunks after that, it is purged entirely.
|
||||||
func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
|
func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
|
||||||
|
defer func(begin time.Time) {
|
||||||
|
s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe(
|
||||||
|
float64(time.Since(begin)) / float64(time.Millisecond),
|
||||||
|
)
|
||||||
|
}(time.Now())
|
||||||
|
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
|
@ -878,15 +922,37 @@ func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeT
|
||||||
return s.persistence.loadChunkDescs(fp, beforeTime)
|
return s.persistence.loadChunkDescs(fp, beforeTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPersistQueueLen returns persistQueueLen in a goroutine-safe way.
|
// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
|
||||||
func (s *memorySeriesStorage) getPersistQueueLen() int {
|
func (s *memorySeriesStorage) getNumChunksToPersist() int {
|
||||||
return int(atomic.LoadInt64(&s.persistQueueLen))
|
return int(atomic.LoadInt64(&s.numChunksToPersist))
|
||||||
}
|
}
|
||||||
|
|
||||||
// incPersistQueueLen increments persistQueueLen in a goroutine-safe way. Use a
|
// incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a
|
||||||
// negative 'by' to decrement.
|
// negative 'by' to decrement.
|
||||||
func (s *memorySeriesStorage) incPersistQueueLen(by int) {
|
func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
|
||||||
atomic.AddInt64(&s.persistQueueLen, int64(by))
|
atomic.AddInt64(&s.numChunksToPersist, int64(by))
|
||||||
|
}
|
||||||
|
|
||||||
|
// isDegraded returns whether the storage is in "graceful degradation mode",
|
||||||
|
// which is the case if the number of chunks waiting for persistence has reached
|
||||||
|
// a percentage of maxChunksToPersist that exceeds
|
||||||
|
// percentChunksToPersistForDegradation. The method is not goroutine safe (but
|
||||||
|
// only ever called from the goroutine dealing with series maintenance).
|
||||||
|
// Changes of degradation mode are logged.
|
||||||
|
func (s *memorySeriesStorage) isDegraded() bool {
|
||||||
|
nowDegraded := s.getNumChunksToPersist() > s.maxChunksToPersist*percentChunksToPersistForDegradation/100
|
||||||
|
if s.degraded && !nowDegraded {
|
||||||
|
glog.Warning("Storage has left graceful degradation mode. Things are back to normal.")
|
||||||
|
} else if !s.degraded && nowDegraded {
|
||||||
|
glog.Warningf(
|
||||||
|
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v.",
|
||||||
|
s.getNumChunksToPersist(),
|
||||||
|
s.getNumChunksToPersist()*100/s.maxChunksToPersist,
|
||||||
|
s.maxChunksToPersist,
|
||||||
|
s.checkpointInterval)
|
||||||
|
}
|
||||||
|
s.degraded = nowDegraded
|
||||||
|
return s.degraded
|
||||||
}
|
}
|
||||||
|
|
||||||
// Describe implements prometheus.Collector.
|
// Describe implements prometheus.Collector.
|
||||||
|
@ -894,14 +960,14 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||||
s.persistence.Describe(ch)
|
s.persistence.Describe(ch)
|
||||||
|
|
||||||
ch <- s.persistErrors.Desc()
|
ch <- s.persistErrors.Desc()
|
||||||
ch <- s.persistQueueCapacity.Desc()
|
ch <- maxChunksToPersistDesc
|
||||||
ch <- persistQueueLengthDesc
|
ch <- numChunksToPersistDesc
|
||||||
ch <- s.numSeries.Desc()
|
ch <- s.numSeries.Desc()
|
||||||
s.seriesOps.Describe(ch)
|
s.seriesOps.Describe(ch)
|
||||||
ch <- s.ingestedSamplesCount.Desc()
|
ch <- s.ingestedSamplesCount.Desc()
|
||||||
ch <- s.invalidPreloadRequestsCount.Desc()
|
ch <- s.invalidPreloadRequestsCount.Desc()
|
||||||
|
|
||||||
ch <- numMemChunksDesc
|
ch <- numMemChunksDesc
|
||||||
|
s.maintainSeriesDuration.Describe(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect implements prometheus.Collector.
|
// Collect implements prometheus.Collector.
|
||||||
|
@ -909,20 +975,24 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
|
||||||
s.persistence.Collect(ch)
|
s.persistence.Collect(ch)
|
||||||
|
|
||||||
ch <- s.persistErrors
|
ch <- s.persistErrors
|
||||||
ch <- s.persistQueueCapacity
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
persistQueueLengthDesc,
|
maxChunksToPersistDesc,
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
float64(s.getPersistQueueLen()),
|
float64(s.maxChunksToPersist),
|
||||||
|
)
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
numChunksToPersistDesc,
|
||||||
|
prometheus.GaugeValue,
|
||||||
|
float64(s.getNumChunksToPersist()),
|
||||||
)
|
)
|
||||||
ch <- s.numSeries
|
ch <- s.numSeries
|
||||||
s.seriesOps.Collect(ch)
|
s.seriesOps.Collect(ch)
|
||||||
ch <- s.ingestedSamplesCount
|
ch <- s.ingestedSamplesCount
|
||||||
ch <- s.invalidPreloadRequestsCount
|
ch <- s.invalidPreloadRequestsCount
|
||||||
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
numMemChunksDesc,
|
numMemChunksDesc,
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
float64(atomic.LoadInt64(&numMemChunks)),
|
float64(atomic.LoadInt64(&numMemChunks)),
|
||||||
)
|
)
|
||||||
|
s.maintainSeriesDuration.Collect(ch)
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,10 +157,11 @@ func TestLoop(t *testing.T) {
|
||||||
defer directory.Close()
|
defer directory.Close()
|
||||||
o := &MemorySeriesStorageOptions{
|
o := &MemorySeriesStorageOptions{
|
||||||
MemoryChunks: 50,
|
MemoryChunks: 50,
|
||||||
|
MaxChunksToPersist: 1000000,
|
||||||
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
|
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
|
||||||
PersistenceStoragePath: directory.Path(),
|
PersistenceStoragePath: directory.Path(),
|
||||||
PersistenceQueueCapacity: 1000000,
|
|
||||||
CheckpointInterval: 250 * time.Millisecond,
|
CheckpointInterval: 250 * time.Millisecond,
|
||||||
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
storage, err := NewMemorySeriesStorage(o)
|
storage, err := NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -669,10 +670,11 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
||||||
defer directory.Close()
|
defer directory.Close()
|
||||||
o := &MemorySeriesStorageOptions{
|
o := &MemorySeriesStorageOptions{
|
||||||
MemoryChunks: 100,
|
MemoryChunks: 100,
|
||||||
|
MaxChunksToPersist: 1000000,
|
||||||
PersistenceRetentionPeriod: time.Hour,
|
PersistenceRetentionPeriod: time.Hour,
|
||||||
PersistenceStoragePath: directory.Path(),
|
PersistenceStoragePath: directory.Path(),
|
||||||
PersistenceQueueCapacity: 1000000,
|
|
||||||
CheckpointInterval: time.Second,
|
CheckpointInterval: time.Second,
|
||||||
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
s, err := NewMemorySeriesStorage(o)
|
s, err := NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -42,10 +42,11 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
|
||||||
directory := test.NewTemporaryDirectory("test_storage", t)
|
directory := test.NewTemporaryDirectory("test_storage", t)
|
||||||
o := &MemorySeriesStorageOptions{
|
o := &MemorySeriesStorageOptions{
|
||||||
MemoryChunks: 1000000,
|
MemoryChunks: 1000000,
|
||||||
|
MaxChunksToPersist: 1000000,
|
||||||
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
|
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
|
||||||
PersistenceStoragePath: directory.Path(),
|
PersistenceStoragePath: directory.Path(),
|
||||||
PersistenceQueueCapacity: 1000000,
|
|
||||||
CheckpointInterval: time.Hour,
|
CheckpointInterval: time.Hour,
|
||||||
|
SyncStrategy: Adaptive,
|
||||||
}
|
}
|
||||||
storage, err := NewMemorySeriesStorage(o)
|
storage, err := NewMemorySeriesStorage(o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue