diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 4444840fe7..b7a56520d5 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -58,8 +58,10 @@ const ( ) const ( - flagChunkDescsLoaded byte = 1 << iota - flagHeadChunkPersisted + flagHeadChunkPersisted byte = 1 << iota + // Add more flags here like: + // flagFoo + // flagBar ) type indexingOpType byte @@ -228,34 +230,53 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie } // persistChunk persists a single chunk of a series. It is the caller's -// responsibility to not modify chunk concurrently and to not persist or drop anything -// for the same fingerprint concurrently. -func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) error { +// responsibility to not modify chunk concurrently and to not persist or drop +// anything for the same fingerprint concurrently. It returns the (zero-based) +// index of the persisted chunk within the series file. In case of an error, the +// returned index is -1 (to avoid the misconception that the chunk was written +// at position 0). +func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) { // 1. Open chunk file. f, err := p.openChunkFileForWriting(fp) if err != nil { - return err + return -1, err } defer f.Close() b := bufio.NewWriterSize(f, chunkHeaderLen+p.chunkLen) - defer b.Flush() // 2. Write the header (chunk type and first/last times). err = writeChunkHeader(b, c) if err != nil { - return err + return -1, err } // 3. Write chunk into file. - return c.marshal(b) + err = c.marshal(b) + if err != nil { + return -1, err + } + + // 4. Determine index within the file. + b.Flush() + offset, err := f.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + index, err := p.chunkIndexForOffset(offset) + if err != nil { + return -1, err + } + + return index - 1, err } // loadChunks loads a group of chunks of a timeseries by their index. The chunk // with the earliest time will have index 0, the following ones will have -// incrementally larger indexes. It is the caller's responsibility to not -// persist or drop anything for the same fingerprint concurrently. -func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]chunk, error) { +// incrementally larger indexes. The indexOffset denotes the offset to be added to +// each index in indexes. It is the caller's responsibility to not persist or +// drop anything for the same fingerprint concurrently. +func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { // TODO: we need to verify at some point that file length is a multiple of // the chunk size. When is the best time to do this, and where to remember // it? Right now, we only do it when loading chunkDescs. @@ -268,7 +289,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]c chunks := make([]chunk, 0, len(indexes)) typeBuf := make([]byte, 1) for _, idx := range indexes { - _, err := f.Seek(p.offsetForChunkIndex(idx), os.SEEK_SET) + _, err := f.Seek(p.offsetForChunkIndex(idx+indexOffset), os.SEEK_SET) if err != nil { return nil, err } @@ -339,7 +360,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } - if !cd.firstTime().Before(beforeTime) { + if cd.chunkLastTime.After(beforeTime) { // From here on, we have chunkDescs in memory already. break } @@ -412,9 +433,6 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } realNumberOfSeries++ var seriesFlags byte - if m.series.chunkDescsLoaded { - seriesFlags |= flagChunkDescsLoaded - } if m.series.headChunkPersisted { seriesFlags |= flagHeadChunkPersisted } @@ -430,6 +448,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } w.Write(buf) + if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil { + return + } if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil { return } @@ -523,6 +544,10 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { if err := metric.UnmarshalFromReader(r); err != nil { return nil, err } + chunkDescsOffset, err := binary.ReadVarint(r) + if err != nil { + return nil, err + } numChunkDescs, err := binary.ReadVarint(r) if err != nil { return nil, err @@ -562,7 +587,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { fingerprintToSeries[clientmodel.Fingerprint(fp)] = &memorySeries{ metric: clientmodel.Metric(metric), chunkDescs: chunkDescs, - chunkDescsLoaded: seriesFlags&flagChunkDescsLoaded != 0, + chunkDescsOffset: int(chunkDescsOffset), headChunkPersisted: headChunkPersisted, } } @@ -572,24 +597,25 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) { } // dropChunks deletes all chunks from a series whose last sample time is before -// beforeTime. It returns true if all chunks of the series have been deleted. -// It is the caller's responsibility to make sure nothing is persisted or loaded -// for the same fingerprint concurrently. -func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) { +// beforeTime. It returns the number of deleted chunks and true if all chunks of +// the series have been deleted. It is the caller's responsibility to make sure +// nothing is persisted or loaded for the same fingerprint concurrently. +func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (int, bool, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { - return true, nil + return 0, true, nil } if err != nil { - return false, err + return 0, false, err } defer f.Close() // Find the first chunk that should be kept. - for i := 0; ; i++ { + var i int + for ; ; i++ { _, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderLastTimeOffset, os.SEEK_SET) if err != nil { - return false, err + return 0, false, err } lastTimeBuf := make([]byte, 8) _, err = io.ReadAtLeast(f, lastTimeBuf, 8) @@ -598,12 +624,12 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo // be kept. Remove the whole file. chunkOps.WithLabelValues(purge).Add(float64(i)) if err := os.Remove(f.Name()); err != nil { - return true, err + return 0, true, err } - return true, nil + return i, true, nil } if err != nil { - return false, err + return 0, false, err } lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(lastTimeBuf)) if !lastTime.Before(beforeTime) { @@ -617,21 +643,23 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo // file. _, err = f.Seek(-(chunkHeaderLastTimeOffset + 8), os.SEEK_CUR) if err != nil { - return false, err + return 0, false, err } temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640) if err != nil { - return false, err + return 0, false, err } defer temp.Close() if _, err := io.Copy(temp, f); err != nil { - return false, err + return 0, false, err } - os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)) - return false, nil + if err := os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)); err != nil { + return 0, false, err + } + return i, false, nil } // indexMetric queues the given metric for addition to the indexes needed by @@ -836,6 +864,16 @@ func (p *persistence) offsetForChunkIndex(i int) int64 { return int64(i * (chunkHeaderLen + p.chunkLen)) } +func (p *persistence) chunkIndexForOffset(offset int64) (int, error) { + if int(offset)%(chunkHeaderLen+p.chunkLen) != 0 { + return -1, fmt.Errorf( + "offset %d is not a multiple of on-disk chunk length %d", + offset, chunkHeaderLen+p.chunkLen, + ) + } + return int(offset) / (chunkHeaderLen + p.chunkLen), nil +} + func (p *persistence) headsFileName() string { return path.Join(p.basePath, headsFileName) } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 66b09a8a7d..52bfb1d86d 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -82,10 +82,14 @@ func TestPersistChunk(t *testing.T) { fpToChunks := buildTestChunks() for fp, chunks := range fpToChunks { - for _, c := range chunks { - if err := p.persistChunk(fp, c); err != nil { + for i, c := range chunks { + index, err := p.persistChunk(fp, c) + if err != nil { t.Fatal(err) } + if i != index { + t.Errorf("Want chunk index %d, got %d.", i, index) + } } } @@ -94,7 +98,7 @@ func TestPersistChunk(t *testing.T) { for i := range expectedChunks { indexes = append(indexes, i) } - actualChunks, err := p.loadChunks(fp, indexes) + actualChunks, err := p.loadChunks(fp, indexes, 0) if err != nil { t.Fatal(err) } diff --git a/storage/local/series.go b/storage/local/series.go index b77df8c639..a7b47e5ad3 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -136,10 +136,16 @@ type memorySeries struct { metric clientmodel.Metric // Sorted by start time, overlapping chunk ranges are forbidden. chunkDescs []*chunkDesc - // Whether chunkDescs for chunks on disk are all loaded. If false, some - // (or all) chunkDescs are only on disk. These chunks are all contiguous - // and at the tail end. - chunkDescsLoaded bool + // The chunkDescs in memory might not have all the chunkDescs for the + // chunks that are persisted to disk. The missing chunkDescs are all + // contiguous and at the tail end. chunkDescsOffset is the index of the + // chunk on disk that corresponds to the first chunkDesc in memory. If + // it is 0, the chunkDescs are all loaded. A value of -1 denotes a + // special case: There are chunks on disk, but the offset to the + // chunkDescs in memory is unknown. Also, there is no overlap between + // chunks on disk and chunks in memory (implying that upon first + // persiting of a chunk in memory, the offset has to be set). + chunkDescsOffset int // Whether the current head chunk has already been scheduled to be // persisted. If true, the current head chunk must not be modified // anymore. @@ -155,16 +161,20 @@ type memorySeries struct { // or (if false) a series for a metric being unarchived, i.e. a series that // existed before but has been evicted from memory. func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries { - return &memorySeries{ + s := memorySeries{ metric: m, - chunkDescsLoaded: reallyNew, headChunkPersisted: !reallyNew, } + if !reallyNew { + s.chunkDescsOffset = -1 + } + return &s } // add adds a sample pair to the series. +// It returns chunkDescs that must be queued to be persisted. // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) { +func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc { if len(s.chunkDescs) == 0 || s.headChunkPersisted { newHead := newChunkDesc(newDeltaEncodedChunk(d1, d0, true)) s.chunkDescs = append(s.chunkDescs, newHead) @@ -187,33 +197,27 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per } chunks := s.head().add(v) - s.head().chunk = chunks[0] + + var chunkDescsToPersist []*chunkDesc if len(chunks) > 1 { - queuePersist := func(cd *chunkDesc) { - persistQueue <- &persistRequest{ - fingerprint: fp, - chunkDesc: cd, - } - } - - queuePersist(s.head()) - + chunkDescsToPersist = append(chunkDescsToPersist, s.head()) for i, c := range chunks[1:] { cd := newChunkDesc(c) s.chunkDescs = append(s.chunkDescs, cd) // The last chunk is still growing. if i < len(chunks[1:])-1 { - queuePersist(cd) + chunkDescsToPersist = append(chunkDescsToPersist, cd) } } } + return chunkDescsToPersist } // evictOlderThan marks for eviction all chunks whose latest sample is older -// than the given timestamp. It returns true if all chunks in the series were -// immediately evicted (i.e. all chunks are older than the timestamp, and none -// of the chunks was pinned). +// than the given timestamp. It returns allEvicted as true if all chunks in the +// series were immediately evicted (i.e. all chunks are older than the +// timestamp, and none of the chunks was pinned). // // The method also evicts chunkDescs if there are chunkDescEvictionFactor times // more chunkDescs in the series than unevicted chunks. (The number of unevicted @@ -222,25 +226,22 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per // series, even if chunks in between were evicted.) // // Special considerations for the head chunk: If it has not been scheduled to be -// persisted yet but is old enough for eviction, the scheduling happens now. (To -// do that, the method neets the fingerprint and the persist queue.) It is -// likely that the actual persisting will not happen soon enough to immediately -// evict the head chunk, though. Thus, calling evictOlderThan for a series with -// a non-persisted head chunk will most likely return false, even if no chunk is -// pinned for other reasons. A series old enough for archiving will usually +// persisted yet but is old enough for eviction, this method returns +// persistHeadChunk as true. The caller is then responsible for persisting the +// head chunk. The internal state of this memorySeries is already set +// accordingly by this method. Calling evictOlderThan for a series with a +// non-persisted head chunk that is old enough for eviction will never evict all +// chunks immediately, even if no chunk is pinned for other reasons, because the +// head chunk is not persisted yet. A series old enough for archiving will // require at least two eviction runs to become ready for archiving: In the -// first run, its head chunk is scheduled to be persisted. The next call of +// first run, its head chunk is requested to be persisted. The next call of // evictOlderThan will then return true, provided that the series hasn't // received new samples in the meantime, the head chunk has now been persisted, // and no chunk is pinned for other reasons. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) evictOlderThan( - t clientmodel.Timestamp, - fp clientmodel.Fingerprint, - persistQueue chan *persistRequest, -) bool { - allEvicted := true +func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool, persistHeadChunk bool) { + allEvicted = true iOldestNotEvicted := -1 defer func() { @@ -250,8 +251,8 @@ func (s *memorySeries) evictOlderThan( if iOldestNotEvicted != -1 { lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) if lenToKeep < len(s.chunkDescs) { - s.chunkDescsLoaded = false lenEvicted := len(s.chunkDescs) - lenToKeep + s.chunkDescsOffset += lenEvicted chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) atomic.AddInt64(&numMemChunkDescs, -int64(lenEvicted)) s.chunkDescs = append( @@ -268,22 +269,19 @@ func (s *memorySeries) evictOlderThan( if iOldestNotEvicted == -1 { iOldestNotEvicted = i } - return false + return false, false } if cd.isEvicted() { continue } if !s.headChunkPersisted && i == len(s.chunkDescs)-1 { // This is a non-persisted head chunk that is old enough - // for eviction. Queue it to be persisted: + // for eviction. Request it to be persisted: + persistHeadChunk = true s.headChunkPersisted = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. s.headChunkUsedByIterator = false - persistQueue <- &persistRequest{ - fingerprint: fp, - chunkDesc: cd, - } } if !cd.evictOnUnpin() { if iOldestNotEvicted == -1 { @@ -292,13 +290,15 @@ func (s *memorySeries) evictOlderThan( allEvicted = false } } - return allEvicted + return allEvicted, persistHeadChunk } -// purgeOlderThan returns true if all chunks have been purged. +// purgeOlderThan removes chunkDescs older than t. It also evicts the chunks of +// those chunkDescs (although that's probably not even necessary). It returns +// the number of purged chunkDescs and true if all chunkDescs have been purged. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { +func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { if !cd.lastTime().Before(t) { @@ -307,9 +307,11 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { } s.chunkDescs[i].evictOnUnpin() } - s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) - atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx)) - return len(s.chunkDescs) == 0 + if keepIdx > 0 { + s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...) + atomic.AddInt64(&numMemChunkDescs, -int64(keepIdx)) + } + return keepIdx, len(s.chunkDescs) == 0 } // preloadChunks is an internal helper method. @@ -327,8 +329,11 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDes chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs))) if len(loadIndexes) > 0 { + if s.chunkDescsOffset == -1 { + panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory") + } fp := s.metric.Fingerprint() - chunks, err := p.loadChunks(fp, loadIndexes) + chunks, err := p.loadChunks(fp, loadIndexes, s.chunkDescsOffset) if err != nil { // Unpin the chunks since we won't return them as pinned chunks now. for _, cd := range pinnedChunkDescs { @@ -388,13 +393,13 @@ func (s *memorySeries) preloadChunksForRange( if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() } - if !s.chunkDescsLoaded && from.Before(firstChunkDescTime) { + if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { cds, err := p.loadChunkDescs(fp, firstChunkDescTime) if err != nil { return nil, err } s.chunkDescs = append(cds, s.chunkDescs...) - s.chunkDescsLoaded = true + s.chunkDescsOffset = 0 } if len(s.chunkDescs) == 0 { diff --git a/storage/local/storage.go b/storage/local/storage.go index 5ee336b2a3..cd252dc11e 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -43,15 +43,15 @@ type persistRequest struct { } type memorySeriesStorage struct { - fpLocker *fingerprintLocker - fingerprintToSeries *seriesMap + fpLocker *fingerprintLocker + fpToSeries *seriesMap loopStopping, loopStopped chan struct{} evictInterval, evictAfter time.Duration purgeInterval, purgeAfter time.Duration checkpointInterval time.Duration - persistQueue chan *persistRequest + persistQueue chan persistRequest persistStopped chan struct{} persistence *persistence @@ -84,22 +84,22 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { return nil, err } glog.Info("Loading series map and head chunks...") - fingerprintToSeries, err := p.loadSeriesMapAndHeads() + fpToSeries, err := p.loadSeriesMapAndHeads() if err != nil { return nil, err } - glog.Infof("%d series loaded.", fingerprintToSeries.length()) + glog.Infof("%d series loaded.", fpToSeries.length()) numSeries := prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "memory_series", Help: "The current number of series in memory.", }) - numSeries.Set(float64(fingerprintToSeries.length())) + numSeries.Set(float64(fpToSeries.length())) return &memorySeriesStorage{ - fpLocker: newFingerprintLocker(100), // TODO: Tweak value. - fingerprintToSeries: fingerprintToSeries, + fpLocker: newFingerprintLocker(100), // TODO: Tweak value. + fpToSeries: fpToSeries, loopStopping: make(chan struct{}), loopStopped: make(chan struct{}), @@ -109,7 +109,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { purgeAfter: o.PersistenceRetentionPeriod, checkpointInterval: o.CheckpointInterval, - persistQueue: make(chan *persistRequest, persistQueueCap), + persistQueue: make(chan persistRequest, persistQueueCap), persistStopped: make(chan struct{}), persistence: p, @@ -182,7 +182,7 @@ func (s *memorySeriesStorage) Stop() error { <-s.persistStopped // One final checkpoint of the series map and the head chunks. - if err := s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker); err != nil { + if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil { return err } @@ -202,7 +202,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { // Oops, no series for fp found. That happens if, after // preloading is done, the whole series is identified as old @@ -301,7 +301,7 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if ok { // Copy required here because caller might mutate the returned // metric. @@ -330,17 +330,21 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { fp := sample.Metric.Fingerprint() s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - series := s.getOrCreateSeries(fp, sample.Metric) - series.add(fp, &metric.SamplePair{ + chunkDescsToPersist := series.add(fp, &metric.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, - }, s.persistQueue) + }) + s.fpLocker.Unlock(fp) + // Queue only outside of the locked area, processing the persistQueue + // requires the same lock! + for _, cd := range chunkDescsToPersist { + s.persistQueue <- persistRequest{fp, cd} + } } func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { @@ -354,7 +358,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl s.seriesOps.WithLabelValues(create).Inc() } series = newMemorySeries(m, !unarchived) - s.fingerprintToSeries.put(fp, series) + s.fpToSeries.put(fp, series) s.numSeries.Inc() } return series @@ -362,7 +366,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m cl /* func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) { - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { panic("requested preload for non-existent series") } @@ -378,7 +382,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fingerprintToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { has, first, last, err := s.persistence.hasArchivedMetric(fp) if err != nil { @@ -404,12 +408,21 @@ func (s *memorySeriesStorage) handlePersistQueue() { for req := range s.persistQueue { s.persistQueueLength.Set(float64(len(s.persistQueue))) start := time.Now() - err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) + s.fpLocker.Lock(req.fingerprint) + offset, err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) + if series, seriesInMemory := s.fpToSeries.get(req.fingerprint); err == nil && seriesInMemory && series.chunkDescsOffset == -1 { + // This is the first chunk persisted for a newly created + // series that had prior chunks on disk. Finally, we can + // set the chunkDescsOffset. + series.chunkDescsOffset = offset + } + s.fpLocker.Unlock(req.fingerprint) s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond)) if err != nil { s.persistErrors.WithLabelValues(err.Error()).Inc() - glog.Error("Error persisting chunk, requeuing: ", err) - s.persistQueue <- req + glog.Error("Error persisting chunk: ", err) + glog.Error("The storage is now inconsistent. Prepare for disaster.") + // TODO: Remove respective chunkDesc to at least be consistent? continue } req.chunkDesc.unpin() @@ -436,13 +449,13 @@ func (s *memorySeriesStorage) loop() { case <-s.loopStopping: return case <-checkpointTicker.C: - s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker) + s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker) case <-evictTicker.C: // TODO: Change this to be based on number of chunks in memory. glog.Info("Evicting chunks...") begin := time.Now() - for m := range s.fingerprintToSeries.iter() { + for m := range s.fpToSeries.iter() { select { case <-s.loopStopping: glog.Info("Interrupted evicting chunks.") @@ -451,11 +464,11 @@ func (s *memorySeriesStorage) loop() { // Keep going. } s.fpLocker.Lock(m.fp) - if m.series.evictOlderThan( - clientmodel.TimestampFromTime(time.Now()).Add(-1*s.evictAfter), - m.fp, s.persistQueue, - ) { - s.fingerprintToSeries.del(m.fp) + allEvicted, persistHeadChunk := m.series.evictOlderThan( + clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.evictAfter), + ) + if allEvicted { + s.fpToSeries.del(m.fp) s.numSeries.Dec() if err := s.persistence.archiveMetric( m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), @@ -466,6 +479,10 @@ func (s *memorySeriesStorage) loop() { } } s.fpLocker.Unlock(m.fp) + // Queue outside of lock! + if persistHeadChunk { + s.persistQueue <- persistRequest{m.fp, m.series.head()} + } } duration := time.Since(begin) @@ -476,7 +493,7 @@ func (s *memorySeriesStorage) loop() { ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter) begin := time.Now() - for fp := range s.fingerprintToSeries.fpIter() { + for fp := range s.fpToSeries.fpIter() { select { case <-s.loopStopping: glog.Info("Interrupted purging series.") @@ -515,18 +532,24 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime defer s.fpLocker.Unlock(fp) // First purge persisted chunks. We need to do that anyway. - allDropped, err := s.persistence.dropChunks(fp, beforeTime) + numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime) if err != nil { glog.Error("Error purging persisted chunks: ", err) } // Purge chunks from memory accordingly. - if series, ok := s.fingerprintToSeries.get(fp); ok { - if series.purgeOlderThan(beforeTime) && allDropped { - s.fingerprintToSeries.del(fp) + if series, ok := s.fpToSeries.get(fp); ok { + numPurged, allPurged := series.purgeOlderThan(beforeTime) + if allPurged && allDropped { + s.fpToSeries.del(fp) s.numSeries.Dec() s.seriesOps.WithLabelValues(memoryPurge).Inc() s.persistence.unindexMetric(series.metric, fp) + } else if series.chunkDescsOffset != -1 { + series.chunkDescsOffset += numPurged - numDropped + if series.chunkDescsOffset < 0 { + panic("dropped more chunks from persistence than from memory") + } } return } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 8c905b5111..ef51361099 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -37,7 +37,7 @@ func TestChunk(t *testing.T) { s.AppendSamples(samples) - for m := range s.(*memorySeriesStorage).fingerprintToSeries.iter() { + for m := range s.(*memorySeriesStorage).fpToSeries.iter() { for i, v := range m.series.values() { if samples[i].Timestamp != v.Timestamp { t.Fatalf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp)