mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Rename persist queue len/cap to num/max chunks to persist.
Remove deprecated flag storage.incoming-samples-queue-capacity.
This commit is contained in:
parent
a075900f9a
commit
da7c0461c6
6
main.go
6
main.go
|
@ -52,12 +52,10 @@ var (
|
||||||
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
|
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
|
||||||
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
|
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
|
||||||
|
|
||||||
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 0, "Deprecated. Has no effect anymore.")
|
|
||||||
|
|
||||||
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
|
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
|
||||||
|
|
||||||
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
|
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
|
||||||
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 1024*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.")
|
maxChunksToPersist = flag.Int("storage.local.max-chunks-to-persist", 1024*1024, "How many chunks can be waiting for persistence before sample ingestion will stop. Many chunks waiting to be persisted will increase the checkpoint size.")
|
||||||
|
|
||||||
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.")
|
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory metrics and the chunks not yet persisted to series files are checkpointed.")
|
||||||
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
|
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
|
||||||
|
@ -91,9 +89,9 @@ func NewPrometheus() *prometheus {
|
||||||
|
|
||||||
o := &local.MemorySeriesStorageOptions{
|
o := &local.MemorySeriesStorageOptions{
|
||||||
MemoryChunks: *numMemoryChunks,
|
MemoryChunks: *numMemoryChunks,
|
||||||
|
MaxChunksToPersist: *maxChunksToPersist,
|
||||||
PersistenceStoragePath: *persistenceStoragePath,
|
PersistenceStoragePath: *persistenceStoragePath,
|
||||||
PersistenceRetentionPeriod: *persistenceRetentionPeriod,
|
PersistenceRetentionPeriod: *persistenceRetentionPeriod,
|
||||||
PersistenceQueueCapacity: *persistenceQueueCapacity,
|
|
||||||
CheckpointInterval: *checkpointInterval,
|
CheckpointInterval: *checkpointInterval,
|
||||||
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
|
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
|
||||||
Dirty: *storageDirty,
|
Dirty: *storageDirty,
|
||||||
|
|
|
@ -627,7 +627,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
// unrecoverable error is encountered, it is returned. Call this method during
|
// unrecoverable error is encountered, it is returned. Call this method during
|
||||||
// start-up while nothing else is running in storage land. This method is
|
// start-up while nothing else is running in storage land. This method is
|
||||||
// utterly goroutine-unsafe.
|
// utterly goroutine-unsafe.
|
||||||
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen int64, err error) {
|
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
|
||||||
var chunkDescsTotal int64
|
var chunkDescsTotal int64
|
||||||
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
|
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
|
||||||
sm = &seriesMap{m: fingerprintToSeries}
|
sm = &seriesMap{m: fingerprintToSeries}
|
||||||
|
@ -690,20 +690,20 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not read series flags:", err)
|
glog.Warning("Could not read series flags:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
|
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
|
||||||
fp, err := codable.DecodeUint64(r)
|
fp, err := codable.DecodeUint64(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode fingerprint:", err)
|
glog.Warning("Could not decode fingerprint:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
var metric codable.Metric
|
var metric codable.Metric
|
||||||
if err := metric.UnmarshalFromReader(r); err != nil {
|
if err := metric.UnmarshalFromReader(r); err != nil {
|
||||||
glog.Warning("Could not decode metric:", err)
|
glog.Warning("Could not decode metric:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
var persistWatermark int64
|
var persistWatermark int64
|
||||||
if version != headsFormatLegacyVersion {
|
if version != headsFormatLegacyVersion {
|
||||||
|
@ -712,26 +712,26 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode persist watermark:", err)
|
glog.Warning("Could not decode persist watermark:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
chunkDescsOffset, err := binary.ReadVarint(r)
|
chunkDescsOffset, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode chunk descriptor offset:", err)
|
glog.Warning("Could not decode chunk descriptor offset:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
savedFirstTime, err := binary.ReadVarint(r)
|
savedFirstTime, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode saved first time:", err)
|
glog.Warning("Could not decode saved first time:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
numChunkDescs, err := binary.ReadVarint(r)
|
numChunkDescs, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode number of chunk descriptors:", err)
|
glog.Warning("Could not decode number of chunk descriptors:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunkDescs := make([]*chunkDesc, numChunkDescs)
|
chunkDescs := make([]*chunkDesc, numChunkDescs)
|
||||||
if version == headsFormatLegacyVersion {
|
if version == headsFormatLegacyVersion {
|
||||||
|
@ -748,13 +748,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode first time:", err)
|
glog.Warning("Could not decode first time:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
lastTime, err := binary.ReadVarint(r)
|
lastTime, err := binary.ReadVarint(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode last time:", err)
|
glog.Warning("Could not decode last time:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunkDescs[i] = &chunkDesc{
|
chunkDescs[i] = &chunkDesc{
|
||||||
chunkFirstTime: clientmodel.Timestamp(firstTime),
|
chunkFirstTime: clientmodel.Timestamp(firstTime),
|
||||||
|
@ -767,16 +767,16 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warning("Could not decode chunk type:", err)
|
glog.Warning("Could not decode chunk type:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunk := newChunkForEncoding(chunkEncoding(encoding))
|
chunk := newChunkForEncoding(chunkEncoding(encoding))
|
||||||
if err := chunk.unmarshal(r); err != nil {
|
if err := chunk.unmarshal(r); err != nil {
|
||||||
glog.Warning("Could not decode chunk type:", err)
|
glog.Warning("Could not decode chunk type:", err)
|
||||||
p.dirty = true
|
p.dirty = true
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
chunkDescs[i] = newChunkDesc(chunk)
|
chunkDescs[i] = newChunkDesc(chunk)
|
||||||
persistQueueLen++
|
chunksToPersist++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -789,7 +789,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, persistQueueLen in
|
||||||
headChunkClosed: persistWatermark >= numChunkDescs,
|
headChunkClosed: persistWatermark >= numChunkDescs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return sm, persistQueueLen, nil
|
return sm, chunksToPersist, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// dropAndPersistChunks deletes all chunks from a series file whose last sample
|
// dropAndPersistChunks deletes all chunks from a series file whose last sample
|
||||||
|
|
|
@ -39,9 +39,14 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
persistQueueLengthDesc = prometheus.NewDesc(
|
numChunksToPersistDesc = prometheus.NewDesc(
|
||||||
prometheus.BuildFQName(namespace, subsystem, "persist_queue_length"),
|
prometheus.BuildFQName(namespace, subsystem, "chunks_to_persist"),
|
||||||
"The current number of chunks waiting in the persist queue.",
|
"The current number of chunks waiting for persistence.",
|
||||||
|
nil, nil,
|
||||||
|
)
|
||||||
|
maxChunksToPersistDesc = prometheus.NewDesc(
|
||||||
|
prometheus.BuildFQName(namespace, subsystem, "max_chunks_to_persist"),
|
||||||
|
"The maximum number of chunks that can be waiting for persistence before sample ingestion will stop.",
|
||||||
nil, nil,
|
nil, nil,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -61,10 +66,8 @@ type memorySeriesStorage struct {
|
||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
checkpointDirtySeriesLimit int
|
checkpointDirtySeriesLimit int
|
||||||
|
|
||||||
persistQueueLen int64 // The number of chunks that need persistence.
|
numChunksToPersist int64 // The number of chunks waiting for persistence.
|
||||||
persistQueueCap int // If persistQueueLen reaches this threshold, ingestion will stall.
|
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
|
||||||
// Note that internally, the chunks to persist are not organized in a queue-like data structure,
|
|
||||||
// but handled in a more sophisticated way (see maintainMemorySeries).
|
|
||||||
|
|
||||||
persistence *persistence
|
persistence *persistence
|
||||||
|
|
||||||
|
@ -75,7 +78,6 @@ type memorySeriesStorage struct {
|
||||||
evictStopping, evictStopped chan struct{}
|
evictStopping, evictStopped chan struct{}
|
||||||
|
|
||||||
persistErrors prometheus.Counter
|
persistErrors prometheus.Counter
|
||||||
persistQueueCapacity prometheus.Metric
|
|
||||||
numSeries prometheus.Gauge
|
numSeries prometheus.Gauge
|
||||||
seriesOps *prometheus.CounterVec
|
seriesOps *prometheus.CounterVec
|
||||||
ingestedSamplesCount prometheus.Counter
|
ingestedSamplesCount prometheus.Counter
|
||||||
|
@ -87,9 +89,9 @@ type memorySeriesStorage struct {
|
||||||
// values.
|
// values.
|
||||||
type MemorySeriesStorageOptions struct {
|
type MemorySeriesStorageOptions struct {
|
||||||
MemoryChunks int // How many chunks to keep in memory.
|
MemoryChunks int // How many chunks to keep in memory.
|
||||||
|
MaxChunksToPersist int // Max number of chunks waiting to be persisted.
|
||||||
PersistenceStoragePath string // Location of persistence files.
|
PersistenceStoragePath string // Location of persistence files.
|
||||||
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
|
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
|
||||||
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
|
|
||||||
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
||||||
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
||||||
Dirty bool // Force the storage to consider itself dirty on startup.
|
Dirty bool // Force the storage to consider itself dirty on startup.
|
||||||
|
@ -103,7 +105,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
glog.Info("Loading series map and head chunks...")
|
glog.Info("Loading series map and head chunks...")
|
||||||
fpToSeries, persistQueueLen, err := p.loadSeriesMapAndHeads()
|
fpToSeries, numChunksToPersist, err := p.loadSeriesMapAndHeads()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -127,9 +129,9 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
checkpointInterval: o.CheckpointInterval,
|
checkpointInterval: o.CheckpointInterval,
|
||||||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||||
|
|
||||||
persistQueueLen: persistQueueLen,
|
maxChunksToPersist: o.MaxChunksToPersist,
|
||||||
persistQueueCap: o.PersistenceQueueCapacity,
|
numChunksToPersist: numChunksToPersist,
|
||||||
persistence: p,
|
persistence: p,
|
||||||
|
|
||||||
countPersistedHeadChunks: make(chan struct{}, 100),
|
countPersistedHeadChunks: make(chan struct{}, 100),
|
||||||
|
|
||||||
|
@ -144,14 +146,6 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
Name: "persist_errors_total",
|
Name: "persist_errors_total",
|
||||||
Help: "The total number of errors while persisting chunks.",
|
Help: "The total number of errors while persisting chunks.",
|
||||||
}),
|
}),
|
||||||
persistQueueCapacity: prometheus.MustNewConstMetric(
|
|
||||||
prometheus.NewDesc(
|
|
||||||
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
|
|
||||||
"The total capacity of the persist queue.",
|
|
||||||
nil, nil,
|
|
||||||
),
|
|
||||||
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
|
|
||||||
),
|
|
||||||
numSeries: numSeries,
|
numSeries: numSeries,
|
||||||
seriesOps: prometheus.NewCounterVec(
|
seriesOps: prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
|
@ -337,12 +331,12 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
|
||||||
|
|
||||||
// Append implements Storage.
|
// Append implements Storage.
|
||||||
func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
||||||
if s.getPersistQueueLen() >= s.persistQueueCap {
|
if s.getNumChunksToPersist() >= s.maxChunksToPersist {
|
||||||
glog.Warningf(
|
glog.Warningf(
|
||||||
"%d chunks waiting for persistence, sample ingestion suspended.",
|
"%d chunks waiting for persistence, sample ingestion suspended.",
|
||||||
s.getPersistQueueLen(),
|
s.getNumChunksToPersist(),
|
||||||
)
|
)
|
||||||
for s.getPersistQueueLen() >= s.persistQueueCap {
|
for s.getNumChunksToPersist() >= s.maxChunksToPersist {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
glog.Warning("Sample ingestion resumed.")
|
glog.Warning("Sample ingestion resumed.")
|
||||||
|
@ -356,7 +350,7 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
|
||||||
})
|
})
|
||||||
s.fpLocker.Unlock(fp)
|
s.fpLocker.Unlock(fp)
|
||||||
s.ingestedSamplesCount.Inc()
|
s.ingestedSamplesCount.Inc()
|
||||||
s.incPersistQueueLen(completedChunksCount)
|
s.incNumChunksToPersist(completedChunksCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
|
||||||
|
@ -648,7 +642,7 @@ loop:
|
||||||
// possible. So only checkpoint if the persist
|
// possible. So only checkpoint if the persist
|
||||||
// queue is at most 90% full.
|
// queue is at most 90% full.
|
||||||
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
|
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
|
||||||
s.getPersistQueueLen() < s.persistQueueCap*9/10 {
|
s.getNumChunksToPersist() < s.maxChunksToPersist*9/10 {
|
||||||
checkpointTimer.Reset(0)
|
checkpointTimer.Reset(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -709,7 +703,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(
|
||||||
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
||||||
|
|
||||||
if series.maybeCloseHeadChunk() {
|
if series.maybeCloseHeadChunk() {
|
||||||
s.incPersistQueueLen(1)
|
s.incNumChunksToPersist(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
seriesWasDirty := series.dirty
|
seriesWasDirty := series.dirty
|
||||||
|
@ -777,7 +771,7 @@ func (s *memorySeriesStorage) writeMemorySeries(
|
||||||
for _, cd := range cds {
|
for _, cd := range cds {
|
||||||
cd.unpin(s.evictRequests)
|
cd.unpin(s.evictRequests)
|
||||||
}
|
}
|
||||||
s.incPersistQueueLen(-len(cds))
|
s.incNumChunksToPersist(-len(cds))
|
||||||
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
|
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -878,15 +872,15 @@ func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeT
|
||||||
return s.persistence.loadChunkDescs(fp, beforeTime)
|
return s.persistence.loadChunkDescs(fp, beforeTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPersistQueueLen returns persistQueueLen in a goroutine-safe way.
|
// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
|
||||||
func (s *memorySeriesStorage) getPersistQueueLen() int {
|
func (s *memorySeriesStorage) getNumChunksToPersist() int {
|
||||||
return int(atomic.LoadInt64(&s.persistQueueLen))
|
return int(atomic.LoadInt64(&s.numChunksToPersist))
|
||||||
}
|
}
|
||||||
|
|
||||||
// incPersistQueueLen increments persistQueueLen in a goroutine-safe way. Use a
|
// incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a
|
||||||
// negative 'by' to decrement.
|
// negative 'by' to decrement.
|
||||||
func (s *memorySeriesStorage) incPersistQueueLen(by int) {
|
func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
|
||||||
atomic.AddInt64(&s.persistQueueLen, int64(by))
|
atomic.AddInt64(&s.numChunksToPersist, int64(by))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Describe implements prometheus.Collector.
|
// Describe implements prometheus.Collector.
|
||||||
|
@ -894,8 +888,8 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
|
||||||
s.persistence.Describe(ch)
|
s.persistence.Describe(ch)
|
||||||
|
|
||||||
ch <- s.persistErrors.Desc()
|
ch <- s.persistErrors.Desc()
|
||||||
ch <- s.persistQueueCapacity.Desc()
|
ch <- maxChunksToPersistDesc
|
||||||
ch <- persistQueueLengthDesc
|
ch <- numChunksToPersistDesc
|
||||||
ch <- s.numSeries.Desc()
|
ch <- s.numSeries.Desc()
|
||||||
s.seriesOps.Describe(ch)
|
s.seriesOps.Describe(ch)
|
||||||
ch <- s.ingestedSamplesCount.Desc()
|
ch <- s.ingestedSamplesCount.Desc()
|
||||||
|
@ -909,11 +903,15 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
|
||||||
s.persistence.Collect(ch)
|
s.persistence.Collect(ch)
|
||||||
|
|
||||||
ch <- s.persistErrors
|
ch <- s.persistErrors
|
||||||
ch <- s.persistQueueCapacity
|
|
||||||
ch <- prometheus.MustNewConstMetric(
|
ch <- prometheus.MustNewConstMetric(
|
||||||
persistQueueLengthDesc,
|
maxChunksToPersistDesc,
|
||||||
prometheus.GaugeValue,
|
prometheus.GaugeValue,
|
||||||
float64(s.getPersistQueueLen()),
|
float64(s.maxChunksToPersist),
|
||||||
|
)
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
numChunksToPersistDesc,
|
||||||
|
prometheus.GaugeValue,
|
||||||
|
float64(s.getNumChunksToPersist()),
|
||||||
)
|
)
|
||||||
ch <- s.numSeries
|
ch <- s.numSeries
|
||||||
s.seriesOps.Collect(ch)
|
s.seriesOps.Collect(ch)
|
||||||
|
|
|
@ -157,9 +157,9 @@ func TestLoop(t *testing.T) {
|
||||||
defer directory.Close()
|
defer directory.Close()
|
||||||
o := &MemorySeriesStorageOptions{
|
o := &MemorySeriesStorageOptions{
|
||||||
MemoryChunks: 50,
|
MemoryChunks: 50,
|
||||||
|
MaxChunksToPersist: 1000000,
|
||||||
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
|
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
|
||||||
PersistenceStoragePath: directory.Path(),
|
PersistenceStoragePath: directory.Path(),
|
||||||
PersistenceQueueCapacity: 1000000,
|
|
||||||
CheckpointInterval: 250 * time.Millisecond,
|
CheckpointInterval: 250 * time.Millisecond,
|
||||||
}
|
}
|
||||||
storage, err := NewMemorySeriesStorage(o)
|
storage, err := NewMemorySeriesStorage(o)
|
||||||
|
@ -669,9 +669,9 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
||||||
defer directory.Close()
|
defer directory.Close()
|
||||||
o := &MemorySeriesStorageOptions{
|
o := &MemorySeriesStorageOptions{
|
||||||
MemoryChunks: 100,
|
MemoryChunks: 100,
|
||||||
|
MaxChunksToPersist: 1000000,
|
||||||
PersistenceRetentionPeriod: time.Hour,
|
PersistenceRetentionPeriod: time.Hour,
|
||||||
PersistenceStoragePath: directory.Path(),
|
PersistenceStoragePath: directory.Path(),
|
||||||
PersistenceQueueCapacity: 1000000,
|
|
||||||
CheckpointInterval: time.Second,
|
CheckpointInterval: time.Second,
|
||||||
}
|
}
|
||||||
s, err := NewMemorySeriesStorage(o)
|
s, err := NewMemorySeriesStorage(o)
|
||||||
|
|
|
@ -42,9 +42,9 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
|
||||||
directory := test.NewTemporaryDirectory("test_storage", t)
|
directory := test.NewTemporaryDirectory("test_storage", t)
|
||||||
o := &MemorySeriesStorageOptions{
|
o := &MemorySeriesStorageOptions{
|
||||||
MemoryChunks: 1000000,
|
MemoryChunks: 1000000,
|
||||||
|
MaxChunksToPersist: 1000000,
|
||||||
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
|
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
|
||||||
PersistenceStoragePath: directory.Path(),
|
PersistenceStoragePath: directory.Path(),
|
||||||
PersistenceQueueCapacity: 1000000,
|
|
||||||
CheckpointInterval: time.Hour,
|
CheckpointInterval: time.Hour,
|
||||||
}
|
}
|
||||||
storage, err := NewMemorySeriesStorage(o)
|
storage, err := NewMemorySeriesStorage(o)
|
||||||
|
|
Loading…
Reference in a new issue