Fix bugs in chunk evict code.

Also, simplify code by re-looking up metric in metric map.

Change-Id: Ib2092f9184374e5a543e87d3a9f4a74fda64b193
This commit is contained in:
Bjoern Rabenstein 2014-10-06 16:32:08 +02:00
parent 7e6a03fbf9
commit fcdf5a8ee7
2 changed files with 15 additions and 19 deletions

View file

@ -21,12 +21,6 @@ import (
// SeriesMap maps fingerprints to memory series. // SeriesMap maps fingerprints to memory series.
type SeriesMap map[clientmodel.Fingerprint]*memorySeries type SeriesMap map[clientmodel.Fingerprint]*memorySeries
// FingerprintSeriesPair is a fingerprint paired with a memory series.
type FingerprintSeriesPair struct {
Fingerprint clientmodel.Fingerprint
Series *memorySeries
}
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. // are goroutine-safe.
type Storage interface { type Storage interface {

View file

@ -201,7 +201,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
} }
func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
fspsToArchive := []FingerprintSeriesPair{} fpsToArchive := []clientmodel.Fingerprint{}
defer func(begin time.Time) { defer func(begin time.Time) {
evictionDuration.Set(float64(time.Since(begin) / time.Millisecond)) evictionDuration.Set(float64(time.Since(begin) / time.Millisecond))
@ -210,16 +210,13 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
s.mtx.RLock() s.mtx.RLock()
for fp, series := range s.fingerprintToSeries { for fp, series := range s.fingerprintToSeries {
if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) {
fspsToArchive = append(fspsToArchive, FingerprintSeriesPair{ fpsToArchive = append(fpsToArchive, fp)
Fingerprint: fp,
Series: series,
})
}
series.persistHeadChunk(fp, s.persistQueue) series.persistHeadChunk(fp, s.persistQueue)
} }
}
s.mtx.RUnlock() s.mtx.RUnlock()
if len(fspsToArchive) == 0 { if len(fpsToArchive) == 0 {
return return
} }
@ -227,19 +224,24 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
for _, fsp := range fspsToArchive { for _, fp := range fpsToArchive {
series, ok := s.fingerprintToSeries[fp]
if !ok {
// Oops, perhaps another evict run happening in parallel?
continue
}
// TODO: Need series lock (or later FP lock)? // TODO: Need series lock (or later FP lock)?
if !fsp.Series.headChunkPersisted { if !series.headChunkPersisted {
// Oops. The series has received new samples all of a // Oops. The series has received new samples all of a
// sudden, giving it a new head chunk. Leave it alone. // sudden, giving it a new head chunk. Leave it alone.
return continue
} }
if err := s.persistence.ArchiveMetric( if err := s.persistence.ArchiveMetric(
fsp.Fingerprint, fsp.Series.metric, fsp.Series.firstTime(), fsp.Series.lastTime(), fp, series.metric, series.firstTime(), series.lastTime(),
); err != nil { ); err != nil {
glog.Errorf("Error archiving metric %v: %v", fsp.Series.metric, err) glog.Errorf("Error archiving metric %v: %v", series.metric, err)
} }
delete(s.fingerprintToSeries, fsp.Fingerprint) delete(s.fingerprintToSeries, fp)
} }
} }