mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 23:54:05 -08:00
Merge pull request #559 from prometheus/beorn7/fix
Fix the embarrassing bug introduced in commit 0851945
.
This commit is contained in:
commit
f8cb25e932
|
@ -78,8 +78,8 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
|
||||||
// Oops, head chunk was persisted, but nothing on disk.
|
// Oops, head chunk was persisted, but nothing on disk.
|
||||||
// Thus, we lost that series completely. Clean up the remnants.
|
// Thus, we lost that series completely. Clean up the remnants.
|
||||||
delete(fingerprintToSeries, fp)
|
delete(fingerprintToSeries, fp)
|
||||||
if err := p.dropArchivedMetric(fp); err != nil {
|
if err := p.purgeArchivedMetric(fp); err != nil {
|
||||||
// Dropping the archived metric didn't work, so try
|
// Purging the archived metric didn't work, so try
|
||||||
// to unindex it, just in case it's in the indexes.
|
// to unindex it, just in case it's in the indexes.
|
||||||
p.unindexMetric(fp, s.metric)
|
p.unindexMetric(fp, s.metric)
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ const (
|
||||||
unpin = "unpin" // Excluding the unpin on persisting.
|
unpin = "unpin" // Excluding the unpin on persisting.
|
||||||
clone = "clone"
|
clone = "clone"
|
||||||
transcode = "transcode"
|
transcode = "transcode"
|
||||||
purge = "purge"
|
drop = "drop"
|
||||||
|
|
||||||
// Op-types for chunkOps and chunkDescOps.
|
// Op-types for chunkOps and chunkDescOps.
|
||||||
evict = "evict"
|
evict = "evict"
|
||||||
|
|
|
@ -771,7 +771,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
// We ran into the end of the file without finding any chunks that should
|
// We ran into the end of the file without finding any chunks that should
|
||||||
// be kept. Remove the whole file.
|
// be kept. Remove the whole file.
|
||||||
chunkOps.WithLabelValues(purge).Add(float64(i))
|
chunkOps.WithLabelValues(drop).Add(float64(i))
|
||||||
if err := os.Remove(f.Name()); err != nil {
|
if err := os.Remove(f.Name()); err != nil {
|
||||||
return 0, 0, true, err
|
return 0, 0, true, err
|
||||||
}
|
}
|
||||||
|
@ -783,7 +783,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo
|
||||||
lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf[8:]))
|
lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf[8:]))
|
||||||
if !lastTime.Before(beforeTime) {
|
if !lastTime.Before(beforeTime) {
|
||||||
firstTime = clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf))
|
firstTime = clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf))
|
||||||
chunkOps.WithLabelValues(purge).Add(float64(i))
|
chunkOps.WithLabelValues(drop).Add(float64(i))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -824,7 +824,7 @@ func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metr
|
||||||
// indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and
|
// indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and
|
||||||
// getFingerprintsModifiedBefore. The index of fingerprints to archived metrics
|
// getFingerprintsModifiedBefore. The index of fingerprints to archived metrics
|
||||||
// is not affected by this removal. (In fact, never call this method for an
|
// is not affected by this removal. (In fact, never call this method for an
|
||||||
// archived metric. To drop an archived metric, call dropArchivedFingerprint.)
|
// archived metric. To purge an archived metric, call purgeArchivedFingerprint.)
|
||||||
// If the queue is full, this method blocks until the metric can be queued. This
|
// If the queue is full, this method blocks until the metric can be queued. This
|
||||||
// method is goroutine-safe.
|
// method is goroutine-safe.
|
||||||
func (p *persistence) unindexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) {
|
func (p *persistence) unindexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) {
|
||||||
|
@ -910,11 +910,11 @@ func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel
|
||||||
return metric, err
|
return metric, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// dropArchivedMetric deletes an archived fingerprint and its corresponding
|
// purgeArchivedMetric deletes an archived fingerprint and its corresponding
|
||||||
// metric entirely. It also queues the metric for un-indexing (no need to call
|
// metric entirely. It also queues the metric for un-indexing (no need to call
|
||||||
// unindexMetric for the deleted metric.) The caller must have locked the
|
// unindexMetric for the deleted metric.) It does not touch the series file,
|
||||||
// fingerprint.
|
// though. The caller must have locked the fingerprint.
|
||||||
func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error) {
|
func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.setDirty(true)
|
p.setDirty(true)
|
||||||
|
@ -944,7 +944,7 @@ func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unarchiveMetric deletes an archived fingerprint and its metric, but (in
|
// unarchiveMetric deletes an archived fingerprint and its metric, but (in
|
||||||
// contrast to dropArchivedMetric) does not un-index the metric. If a metric
|
// contrast to purgeArchivedMetric) does not un-index the metric. If a metric
|
||||||
// was actually deleted, the method returns true and the first time of the
|
// was actually deleted, the method returns true and the first time of the
|
||||||
// deleted metric. The caller must have locked the fingerprint.
|
// deleted metric. The caller must have locked the fingerprint.
|
||||||
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (
|
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (
|
||||||
|
|
|
@ -349,11 +349,11 @@ func TestDropArchivedMetric(t *testing.T) {
|
||||||
t.Error("want FP 2 archived")
|
t.Error("want FP 2 archived")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != p.dropArchivedMetric(1) {
|
if err != p.purgeArchivedMetric(1) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err != p.dropArchivedMetric(3) {
|
if err != p.purgeArchivedMetric(3) {
|
||||||
// Dropping something that has not beet archived is not an error.
|
// Purging something that has not beet archived is not an error.
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
p.waitForIndexing()
|
p.waitForIndexing()
|
||||||
|
|
|
@ -242,11 +242,11 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// purgeOlderThan removes chunkDescs older than t. It returns the number of
|
// dropChunks removes chunkDescs older than t. It returns the number of dropped
|
||||||
// purged chunkDescs and true if all chunkDescs have been purged.
|
// chunkDescs and true if all chunkDescs have been dropped.
|
||||||
//
|
//
|
||||||
// The caller must have locked the fingerprint of the series.
|
// The caller must have locked the fingerprint of the series.
|
||||||
func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) {
|
func (s *memorySeries) dropChunks(t clientmodel.Timestamp) (int, bool) {
|
||||||
keepIdx := len(s.chunkDescs)
|
keepIdx := len(s.chunkDescs)
|
||||||
for i, cd := range s.chunkDescs {
|
for i, cd := range s.chunkDescs {
|
||||||
if !cd.lastTime().Before(t) {
|
if !cd.lastTime().Before(t) {
|
||||||
|
|
|
@ -68,7 +68,7 @@ type memorySeriesStorage struct {
|
||||||
|
|
||||||
loopStopping, loopStopped chan struct{}
|
loopStopping, loopStopped chan struct{}
|
||||||
maxMemoryChunks int
|
maxMemoryChunks int
|
||||||
purgeAfter time.Duration
|
dropAfter time.Duration
|
||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
checkpointDirtySeriesLimit int
|
checkpointDirtySeriesLimit int
|
||||||
|
|
||||||
|
@ -96,7 +96,6 @@ type memorySeriesStorage struct {
|
||||||
seriesOps *prometheus.CounterVec
|
seriesOps *prometheus.CounterVec
|
||||||
ingestedSamplesCount prometheus.Counter
|
ingestedSamplesCount prometheus.Counter
|
||||||
invalidPreloadRequestsCount prometheus.Counter
|
invalidPreloadRequestsCount prometheus.Counter
|
||||||
purgeDuration prometheus.Gauge
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MemorySeriesStorageOptions contains options needed by
|
// MemorySeriesStorageOptions contains options needed by
|
||||||
|
@ -105,7 +104,7 @@ type memorySeriesStorage struct {
|
||||||
type MemorySeriesStorageOptions struct {
|
type MemorySeriesStorageOptions struct {
|
||||||
MemoryChunks int // How many chunks to keep in memory.
|
MemoryChunks int // How many chunks to keep in memory.
|
||||||
PersistenceStoragePath string // Location of persistence files.
|
PersistenceStoragePath string // Location of persistence files.
|
||||||
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
|
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
|
||||||
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
|
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
|
||||||
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
||||||
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
||||||
|
@ -140,7 +139,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
|
||||||
loopStopping: make(chan struct{}),
|
loopStopping: make(chan struct{}),
|
||||||
loopStopped: make(chan struct{}),
|
loopStopped: make(chan struct{}),
|
||||||
maxMemoryChunks: o.MemoryChunks,
|
maxMemoryChunks: o.MemoryChunks,
|
||||||
purgeAfter: o.PersistenceRetentionPeriod,
|
dropAfter: o.PersistenceRetentionPeriod,
|
||||||
checkpointInterval: o.CheckpointInterval,
|
checkpointInterval: o.CheckpointInterval,
|
||||||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||||
|
|
||||||
|
@ -670,8 +669,8 @@ func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*c
|
||||||
|
|
||||||
// waitForNextFP waits an estimated duration, after which we want to process
|
// waitForNextFP waits an estimated duration, after which we want to process
|
||||||
// another fingerprint so that we will process all fingerprints in a tenth of
|
// another fingerprint so that we will process all fingerprints in a tenth of
|
||||||
// s.purgeAfter assuming that the system is doing nothing else, e.g. if we want
|
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
|
||||||
// to purge after 40h, we want to cycle through all fingerprints within
|
// to drop chunks after 40h, we want to cycle through all fingerprints within
|
||||||
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore,
|
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore,
|
||||||
// this method will always wait for at least fpMinWaitDuration and never longer
|
// this method will always wait for at least fpMinWaitDuration and never longer
|
||||||
// than fpMaxWaitDuration. If s.loopStopped is closed, it will return false
|
// than fpMaxWaitDuration. If s.loopStopped is closed, it will return false
|
||||||
|
@ -680,7 +679,7 @@ func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*c
|
||||||
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
|
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
|
||||||
d := fpMaxWaitDuration
|
d := fpMaxWaitDuration
|
||||||
if numberOfFPs != 0 {
|
if numberOfFPs != 0 {
|
||||||
sweepTime := s.purgeAfter / 10
|
sweepTime := s.dropAfter / 10
|
||||||
if sweepTime > fpMaxSweepTime {
|
if sweepTime > fpMaxSweepTime {
|
||||||
sweepTime = fpMaxSweepTime
|
sweepTime = fpMaxSweepTime
|
||||||
}
|
}
|
||||||
|
@ -725,6 +724,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.
|
||||||
}
|
}
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
fpIter = s.fpToSeries.fpIter()
|
fpIter = s.fpToSeries.fpIter()
|
||||||
|
count := 0
|
||||||
for fp := range fpIter {
|
for fp := range fpIter {
|
||||||
select {
|
select {
|
||||||
case memoryFingerprints <- fp:
|
case memoryFingerprints <- fp:
|
||||||
|
@ -732,8 +732,14 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.waitForNextFP(s.fpToSeries.length())
|
s.waitForNextFP(s.fpToSeries.length())
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if count > 0 {
|
||||||
|
glog.Infof(
|
||||||
|
"Completed maintenance sweep through %d in-memory fingerprints in %v.",
|
||||||
|
count, time.Since(begin),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begin))
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -750,7 +756,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
|
||||||
|
|
||||||
for {
|
for {
|
||||||
archivedFPs, err := s.persistence.getFingerprintsModifiedBefore(
|
archivedFPs, err := s.persistence.getFingerprintsModifiedBefore(
|
||||||
clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter),
|
clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("Failed to lookup archived fingerprint ranges: ", err)
|
glog.Error("Failed to lookup archived fingerprint ranges: ", err)
|
||||||
|
@ -770,7 +776,12 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
|
||||||
}
|
}
|
||||||
s.waitForNextFP(len(archivedFPs))
|
s.waitForNextFP(len(archivedFPs))
|
||||||
}
|
}
|
||||||
glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begin))
|
if len(archivedFPs) > 0 {
|
||||||
|
glog.Infof(
|
||||||
|
"Completed maintenance sweep through %d archived fingerprints in %v.",
|
||||||
|
len(archivedFPs), time.Since(begin),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return archivedFingerprints
|
return archivedFingerprints
|
||||||
|
@ -807,11 +818,9 @@ loop:
|
||||||
headChunksPersistedSinceLastCheckpoint = 0
|
headChunksPersistedSinceLastCheckpoint = 0
|
||||||
checkpointTimer.Reset(s.checkpointInterval)
|
checkpointTimer.Reset(s.checkpointInterval)
|
||||||
case fp := <-memoryFingerprints:
|
case fp := <-memoryFingerprints:
|
||||||
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
|
s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
|
||||||
s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
|
||||||
case fp := <-archivedFingerprints:
|
case fp := <-archivedFingerprints:
|
||||||
s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter))
|
s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
|
||||||
s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
|
|
||||||
case <-s.countPersistedHeadChunks:
|
case <-s.countPersistedHeadChunks:
|
||||||
headChunksPersistedSinceLastCheckpoint++
|
headChunksPersistedSinceLastCheckpoint++
|
||||||
// Check if we have enough "dirty" series so that we need an early checkpoint.
|
// Check if we have enough "dirty" series so that we need an early checkpoint.
|
||||||
|
@ -835,10 +844,11 @@ loop:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// maintainSeries closes the head chunk if not touched in a while. It archives a
|
// maintainMemorySeries first purges the series from old chunks. If the series
|
||||||
// series if all chunks are evicted. It evicts chunkDescs if there are too
|
// still exists after that, it proceeds with the following steps: It closes the
|
||||||
// many.
|
// head chunk if it was not touched in a while. It archives a series if all
|
||||||
func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
// chunks are evicted. It evicts chunkDescs if there are too many.
|
||||||
|
func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
|
||||||
var headChunkToPersist *chunkDesc
|
var headChunkToPersist *chunkDesc
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -846,19 +856,28 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
||||||
// Queue outside of lock!
|
// Queue outside of lock!
|
||||||
if headChunkToPersist != nil {
|
if headChunkToPersist != nil {
|
||||||
s.persistQueue <- persistRequest{fp, headChunkToPersist}
|
s.persistQueue <- persistRequest{fp, headChunkToPersist}
|
||||||
}
|
|
||||||
// Count that a head chunk was persisted, but only best effort, i.e. we
|
// Count that a head chunk was persisted, but only best effort, i.e. we
|
||||||
// don't want to block here.
|
// don't want to block here.
|
||||||
select {
|
select {
|
||||||
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
|
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
|
||||||
default: // Meh...
|
default: // Meh...
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
series, ok := s.fpToSeries.get(fp)
|
series, ok := s.fpToSeries.get(fp)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
// Series is actually not in memory, perhaps archived or dropped in the meantime.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
||||||
|
|
||||||
|
if s.purgeMemorySeries(fp, series, beforeTime) {
|
||||||
|
// Series is gone now, we are done.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
iOldestNotEvicted := -1
|
iOldestNotEvicted := -1
|
||||||
for i, cd := range series.chunkDescs {
|
for i, cd := range series.chunkDescs {
|
||||||
if !cd.isEvicted() {
|
if !cd.isEvicted() {
|
||||||
|
@ -876,7 +895,10 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
||||||
if len(series.chunkDescs) == 0 {
|
if len(series.chunkDescs) == 0 {
|
||||||
cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
|
cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err)
|
glog.Errorf(
|
||||||
|
"Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v",
|
||||||
|
series.metric, err,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
series.chunkDescs = cds
|
series.chunkDescs = cds
|
||||||
|
@ -902,38 +924,43 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// purgeSeries purges chunks older than beforeTime from a series. If the series
|
// purgeMemorySeries drops chunks older than beforeTime from the provided memory
|
||||||
// contains no chunks after the purge, it is dropped entirely.
|
// series. The caller must have locked fp. If the series contains no chunks
|
||||||
func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
|
// after dropping old chunks, it is purged entirely. In that case, the method
|
||||||
s.fpLocker.Lock(fp)
|
// returns true.
|
||||||
defer s.fpLocker.Unlock(fp)
|
func (s *memorySeriesStorage) purgeMemorySeries(fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp) bool {
|
||||||
|
|
||||||
if series, ok := s.fpToSeries.get(fp); ok {
|
|
||||||
// Deal with series in memory.
|
|
||||||
if !series.firstTime().Before(beforeTime) {
|
if !series.firstTime().Before(beforeTime) {
|
||||||
// Oldest sample not old enough.
|
// Oldest sample not old enough.
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
newFirstTime, numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
|
newFirstTime, numDroppedFromPersistence, allDroppedFromPersistence, err := s.persistence.dropChunks(fp, beforeTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("Error purging persisted chunks: ", err)
|
glog.Error("Error dropping persisted chunks: ", err)
|
||||||
}
|
}
|
||||||
numPurged, allPurged := series.purgeOlderThan(beforeTime)
|
numDroppedFromMemory, allDroppedFromMemory := series.dropChunks(beforeTime)
|
||||||
if allPurged && allDropped {
|
if allDroppedFromPersistence && allDroppedFromMemory {
|
||||||
s.fpToSeries.del(fp)
|
s.fpToSeries.del(fp)
|
||||||
s.numSeries.Dec()
|
s.numSeries.Dec()
|
||||||
s.seriesOps.WithLabelValues(memoryPurge).Inc()
|
s.seriesOps.WithLabelValues(memoryPurge).Inc()
|
||||||
s.persistence.unindexMetric(fp, series.metric)
|
s.persistence.unindexMetric(fp, series.metric)
|
||||||
} else if series.chunkDescsOffset != -1 {
|
return true
|
||||||
|
}
|
||||||
|
if series.chunkDescsOffset != -1 {
|
||||||
series.savedFirstTime = newFirstTime
|
series.savedFirstTime = newFirstTime
|
||||||
series.chunkDescsOffset += numPurged - numDropped
|
series.chunkDescsOffset += numDroppedFromMemory - numDroppedFromPersistence
|
||||||
if series.chunkDescsOffset < 0 {
|
if series.chunkDescsOffset < 0 {
|
||||||
panic("dropped more chunks from persistence than from memory")
|
panic("dropped more chunks from persistence than from memory")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
// Deal with archived series.
|
|
||||||
|
// maintainArchivedSeries drops chunks older than beforeTime from an archived
|
||||||
|
// series. If the series contains no chunks after that, it is purged entirely.
|
||||||
|
func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
|
||||||
|
s.fpLocker.Lock(fp)
|
||||||
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp)
|
has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("Error looking up archived time range: ", err)
|
glog.Error("Error looking up archived time range: ", err)
|
||||||
|
@ -944,13 +971,15 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
|
||||||
|
|
||||||
newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
|
newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("Error purging persisted chunks: ", err)
|
glog.Error("Error dropping persisted chunks: ", err)
|
||||||
}
|
}
|
||||||
if allDropped {
|
if allDropped {
|
||||||
if err := s.persistence.dropArchivedMetric(fp); err != nil {
|
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
|
||||||
glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
|
glog.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.seriesOps.WithLabelValues(archivePurge).Inc()
|
s.seriesOps.WithLabelValues(archivePurge).Inc()
|
||||||
|
|
|
@ -36,6 +36,9 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) {
|
||||||
// TestLoop is just a smoke test for the loop method, if we can switch it on and
|
// TestLoop is just a smoke test for the loop method, if we can switch it on and
|
||||||
// off without disaster.
|
// off without disaster.
|
||||||
func TestLoop(t *testing.T) {
|
func TestLoop(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping test in short mode.")
|
||||||
|
}
|
||||||
samples := make(clientmodel.Samples, 1000)
|
samples := make(clientmodel.Samples, 1000)
|
||||||
for i := range samples {
|
for i := range samples {
|
||||||
samples[i] = &clientmodel.Sample{
|
samples[i] = &clientmodel.Sample{
|
||||||
|
@ -57,8 +60,18 @@ func TestLoop(t *testing.T) {
|
||||||
}
|
}
|
||||||
storage.Start()
|
storage.Start()
|
||||||
storage.AppendSamples(samples)
|
storage.AppendSamples(samples)
|
||||||
time.Sleep(time.Second)
|
storage.WaitForIndexing()
|
||||||
|
series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint())
|
||||||
|
cdsBefore := len(series.chunkDescs)
|
||||||
|
time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in.
|
||||||
|
cdsAfter := len(series.chunkDescs)
|
||||||
storage.Stop()
|
storage.Stop()
|
||||||
|
if cdsBefore <= cdsAfter {
|
||||||
|
t.Errorf(
|
||||||
|
"Number of chunk descriptors should have gone down by now. Got before %d, after %d.",
|
||||||
|
cdsBefore, cdsAfter,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestChunk(t *testing.T) {
|
func TestChunk(t *testing.T) {
|
||||||
|
@ -337,15 +350,15 @@ func TestEvictAndPurgeSeries(t *testing.T) {
|
||||||
s, closer := NewTestStorage(t)
|
s, closer := NewTestStorage(t)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method.
|
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
|
||||||
|
|
||||||
s.AppendSamples(samples)
|
s.AppendSamples(samples)
|
||||||
s.WaitForIndexing()
|
s.WaitForIndexing()
|
||||||
|
|
||||||
fp := clientmodel.Metric{}.Fingerprint()
|
fp := clientmodel.Metric{}.Fingerprint()
|
||||||
|
|
||||||
// Purge ~half of the chunks.
|
// Drop ~half of the chunks.
|
||||||
ms.purgeSeries(fp, 1000)
|
ms.maintainMemorySeries(fp, 1000)
|
||||||
it := s.NewIterator(fp)
|
it := s.NewIterator(fp)
|
||||||
actual := it.GetBoundaryValues(metric.Interval{
|
actual := it.GetBoundaryValues(metric.Interval{
|
||||||
OldestInclusive: 0,
|
OldestInclusive: 0,
|
||||||
|
@ -362,8 +375,8 @@ func TestEvictAndPurgeSeries(t *testing.T) {
|
||||||
t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp)
|
t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purge everything.
|
// Drop everything.
|
||||||
ms.purgeSeries(fp, 10000)
|
ms.maintainMemorySeries(fp, 10000)
|
||||||
it = s.NewIterator(fp)
|
it = s.NewIterator(fp)
|
||||||
actual = it.GetBoundaryValues(metric.Interval{
|
actual = it.GetBoundaryValues(metric.Interval{
|
||||||
OldestInclusive: 0,
|
OldestInclusive: 0,
|
||||||
|
@ -403,18 +416,18 @@ func TestEvictAndPurgeSeries(t *testing.T) {
|
||||||
t.Fatal("not archived")
|
t.Fatal("not archived")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purge ~half of the chunks of an archived series.
|
// Drop ~half of the chunks of an archived series.
|
||||||
ms.purgeSeries(fp, 1000)
|
ms.maintainArchivedSeries(fp, 1000)
|
||||||
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
|
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !archived {
|
if !archived {
|
||||||
t.Fatal("archived series dropped although only half of the chunks purged")
|
t.Fatal("archived series purged although only half of the chunks dropped")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purge everything.
|
// Drop everything.
|
||||||
ms.purgeSeries(fp, 10000)
|
ms.maintainArchivedSeries(fp, 10000)
|
||||||
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
|
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
Loading…
Reference in a new issue