Merge pull request #563 from prometheus/beorn7/fix

Some minor tweaks and bug fixes.
This commit is contained in:
Björn Rabenstein 2015-02-27 13:17:55 +01:00
commit 32f7825c35
2 changed files with 14 additions and 9 deletions

View file

@ -22,7 +22,6 @@ import (
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
@ -586,7 +585,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
// this method during start-up while nothing else is running in storage
// land. This method is utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
var chunksTotal, chunkDescsTotal int64
var chunkDescsTotal int64
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries)
sm = &seriesMap{m: fingerprintToSeries}
@ -599,7 +598,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
}
}
if err == nil {
atomic.AddInt64(&numMemChunks, chunksTotal)
numMemChunkDescs.Add(float64(chunkDescsTotal))
}
}()
@ -682,7 +680,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
return sm, nil
}
chunkDescs := make([]*chunkDesc, numChunkDescs)
chunkDescsTotal += numChunkDescs
for i := int64(0); i < numChunkDescs; i++ {
if headChunkPersisted || i < numChunkDescs-1 {
@ -704,7 +701,6 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
}
} else {
// Non-persisted head chunk.
chunksTotal++
chunkType, err := r.ReadByte()
if err != nil {
glog.Warning("Could not decode chunk type:", err)
@ -721,6 +717,13 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
}
}
chunkDescsTotal += numChunkDescs
if !headChunkPersisted {
// In this case, we have created a chunkDesc with
// newChunkDesc, which will count itself automatically.
// Correct for that by decrementing the count.
chunkDescsTotal--
}
fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{
metric: clientmodel.Metric(metric),
chunkDescs: chunkDescs,

View file

@ -154,7 +154,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
persistStopped: make(chan struct{}),
persistence: p,
countPersistedHeadChunks: make(chan struct{}, 1024),
countPersistedHeadChunks: make(chan struct{}, 100),
evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap),
@ -915,7 +915,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, b
// If we are here, the series is not archived, so check for chunkDesc
// eviction next and then if the head chunk needs to be persisted.
series.evictChunkDescs(iOldestNotEvicted)
if !series.headChunkPersisted && time.Now().Sub(series.head().firstTime().Time()) > headChunkTimeout {
if !series.headChunkPersisted && time.Now().Sub(series.head().lastTime().Time()) > headChunkTimeout {
series.headChunkPersisted = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
@ -1027,8 +1027,10 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount
count := atomic.LoadInt64(&numMemChunks)
ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count))
ch <- prometheus.MustNewConstMetric(
numMemChunksDesc,
prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)))
}
// chunkMaps is a slice of maps with chunkDescs to be persisted.