From 752fac60ae4a07e742dd5908120d8bc3b9fe656b Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 1 Feb 2017 19:41:15 +0100 Subject: [PATCH 1/6] storage: Remove race condition from TestLoop --- storage/local/storage_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index d63b6d3ab..591e3df37 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -840,10 +840,15 @@ func TestLoop(t *testing.T) { storage.Append(s) } storage.WaitForIndexing() - series, _ := storage.fpToSeries.get(model.Metric{}.FastFingerprint()) + fp := model.Metric{}.FastFingerprint() + series, _ := storage.fpToSeries.get(fp) + storage.fpLocker.Lock(fp) cdsBefore := len(series.chunkDescs) + storage.fpLocker.Unlock(fp) time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in. + storage.fpLocker.Lock(fp) cdsAfter := len(series.chunkDescs) + storage.fpLocker.Unlock(fp) storage.Stop() if cdsBefore <= cdsAfter { t.Errorf( From 65dc8f44d3c7cfb8981026afdacd67aa7cd2dfa7 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 1 Feb 2017 20:14:01 +0100 Subject: [PATCH 2/6] storage: Test for errors returned by MaybePopulateLastTime --- storage/local/heads.go | 4 +++- storage/local/persistence.go | 2 +- storage/local/series.go | 13 +++++++------ storage/local/storage.go | 7 ++++++- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/storage/local/heads.go b/storage/local/heads.go index 14a317592..15d2fcabb 100644 --- a/storage/local/heads.go +++ b/storage/local/heads.go @@ -188,7 +188,9 @@ func (hs *headsScanner) scan() bool { // This is NOT the head chunk. So it's a chunk // to be persisted, and we need to populate lastTime. hs.chunksToPersistTotal++ - cd.MaybePopulateLastTime() + if hs.err = cd.MaybePopulateLastTime(); hs.err != nil { + return false + } } chunkDescs[i] = cd } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 5015fb3c6..ebaceeaea 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -693,7 +693,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } // persistWatermark. We only checkpoint chunks that need persisting, so // this is always 0. - if _, err = codable.EncodeVarint(w, int64(0)); err != nil { + if _, err = codable.EncodeVarint(w, 0); err != nil { return } if m.series.modTime.IsZero() { diff --git a/storage/local/series.go b/storage/local/series.go index 4a97c3c5c..bb4ee6bc5 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -247,7 +247,9 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) { // Populate lastTime of now-closed chunks. for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] { - cd.MaybePopulateLastTime() + if err := cd.MaybePopulateLastTime(); err != nil { + return 0, err + } } s.lastTime = v.Timestamp @@ -261,19 +263,18 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) { // If the head chunk is already closed, the method is a no-op and returns false. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) maybeCloseHeadChunk() bool { +func (s *memorySeries) maybeCloseHeadChunk() (bool, error) { if s.headChunkClosed { - return false + return false, nil } if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout { s.headChunkClosed = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. s.headChunkUsedByIterator = false - s.head().MaybePopulateLastTime() - return true + return true, s.head().MaybePopulateLastTime() } - return false + return false, nil } // evictChunkDescs evicts chunkDescs if the chunk is evicted. diff --git a/storage/local/storage.go b/storage/local/storage.go index 8c8ca8d2f..aa0745e98 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -1376,7 +1376,12 @@ func (s *MemorySeriesStorage) maintainMemorySeries( defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() - if series.maybeCloseHeadChunk() { + closed, err := series.maybeCloseHeadChunk() + if err != nil { + s.quarantineSeries(fp, series.metric, err) + s.persistErrors.Inc() + } + if closed { s.incNumChunksToPersist(1) } From 31e9db7f0cec1719834aeca2cda17e02a7a3b434 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Sat, 4 Feb 2017 22:29:37 +0100 Subject: [PATCH 3/6] storage: Simplify evictChunkDesc method --- storage/local/series.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/storage/local/series.go b/storage/local/series.go index bb4ee6bc5..d9c36d0ef 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -277,24 +277,23 @@ func (s *memorySeries) maybeCloseHeadChunk() (bool, error) { return false, nil } -// evictChunkDescs evicts chunkDescs if the chunk is evicted. -// iOldestNotEvicted is the index within the current chunkDescs of the oldest -// chunk that is not evicted. -func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { - lenToKeep := len(s.chunkDescs) - iOldestNotEvicted - if lenToKeep < len(s.chunkDescs) { - s.savedFirstTime = s.firstTime() - lenEvicted := len(s.chunkDescs) - lenToKeep - s.chunkDescsOffset += lenEvicted - s.persistWatermark -= lenEvicted - chunk.DescOps.WithLabelValues(chunk.Evict).Add(float64(lenEvicted)) - chunk.NumMemDescs.Sub(float64(lenEvicted)) - s.chunkDescs = append( - make([]*chunk.Desc, 0, lenToKeep), - s.chunkDescs[lenEvicted:]..., - ) - s.dirty = true +// evictChunkDescs evicts chunkDescs. lenToEvict is the index within the current +// chunkDescs of the oldest chunk that is not evicted. +func (s *memorySeries) evictChunkDescs(lenToEvict int) { + if lenToEvict < 1 { + return } + lenToKeep := len(s.chunkDescs) - lenToEvict + s.savedFirstTime = s.firstTime() + s.chunkDescsOffset += lenToEvict + s.persistWatermark -= lenToEvict + chunk.DescOps.WithLabelValues(chunk.Evict).Add(float64(lenToEvict)) + chunk.NumMemDescs.Sub(float64(lenToEvict)) + s.chunkDescs = append( + make([]*chunk.Desc, 0, lenToKeep), + s.chunkDescs[lenToEvict:]..., + ) + s.dirty = true } // dropChunks removes chunkDescs older than t. The caller must have locked the From 75282b27ba6ae6a45a49eb78afcf39218372ca84 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Sat, 4 Feb 2017 23:40:22 +0100 Subject: [PATCH 4/6] storage: Added checks for invariants --- storage/local/persistence.go | 3 +++ storage/local/series.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index ebaceeaea..a76d5de8e 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -706,6 +706,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } } // chunkDescsOffset. + if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 { + panic("encountered unknown chunk desc offset in combination with positive persist watermark") + } if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil { return } diff --git a/storage/local/series.go b/storage/local/series.go index d9c36d0ef..63e4a0e2a 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -283,6 +283,9 @@ func (s *memorySeries) evictChunkDescs(lenToEvict int) { if lenToEvict < 1 { return } + if s.chunkDescsOffset < 0 { + panic("chunk desc eviction requested with unknown chunk desc offset") + } lenToKeep := len(s.chunkDescs) - lenToEvict s.savedFirstTime = s.firstTime() s.chunkDescsOffset += lenToEvict From 244a65fb293573cad7b80407ff0e14ff96297ec4 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Sun, 5 Feb 2017 02:25:09 +0100 Subject: [PATCH 5/6] storage: Increase persist watermark before calling append The append call may reuse cds, and thus change its len. (In practice, this wouldn't happen as cds should have len==cap. Still, the previous order of lines was problematic.) --- storage/local/series.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/local/series.go b/storage/local/series.go index 63e4a0e2a..ba8898a59 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -464,9 +464,9 @@ func (s *memorySeries) preloadChunksForRange( fp, s.chunkDescsOffset, len(cds), ) } + s.persistWatermark += len(cds) s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescsOffset = 0 - s.persistWatermark += len(cds) if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].FirstTime() } From 2363a90adc48b6408315a1f5a9c850fa33fe997f Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 6 Feb 2017 17:39:59 +0100 Subject: [PATCH 6/6] storage: Do not throw away fully persisted memory series in checkpointing --- storage/local/persistence.go | 70 +++++++++++++++++++++++-------- storage/local/persistence_test.go | 31 ++++++++++++-- 2 files changed, 81 insertions(+), 20 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a76d5de8e..4a3150c7b 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -670,12 +670,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap defer fpLocker.Unlock(m.fp) chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark - if len(m.series.chunkDescs) == 0 || chunksToPersist == 0 { - // This series was completely purged or archived in the meantime or has - // no chunks that need persisting. Ignore. + if len(m.series.chunkDescs) == 0 { + // This series was completely purged or archived + // in the meantime. Ignore. return } realNumberOfSeries++ + + // Sanity checks. + if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 { + panic("encountered unknown chunk desc offset in combination with positive persist watermark") + } + + // These are the values to save in the normal case. + var ( + // persistWatermark is zero as we only checkpoint non-persisted chunks. + persistWatermark int64 + // chunkDescsOffset is shifted by the original persistWatermark for the same reason. + chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark) + numChunkDescs = int64(chunksToPersist) + ) + // However, in the special case of a series being fully + // persisted but still in memory (i.e. not archived), we + // need to save a "placeholder", for which we use just + // the chunk desc of the last chunk. Values have to be + // adjusted accordingly. (The reason for doing it in + // this weird way is to keep the checkpoint format + // compatible with older versions.) + if chunksToPersist == 0 { + persistWatermark = 1 + chunkDescsOffset-- // Save one chunk desc after all. + numChunkDescs = 1 + } + // seriesFlags left empty in v2. if err = w.WriteByte(0); err != nil { return @@ -691,9 +718,7 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = w.Write(buf); err != nil { return } - // persistWatermark. We only checkpoint chunks that need persisting, so - // this is always 0. - if _, err = codable.EncodeVarint(w, 0); err != nil { + if _, err = codable.EncodeVarint(w, persistWatermark); err != nil { return } if m.series.modTime.IsZero() { @@ -705,28 +730,39 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap return } } - // chunkDescsOffset. - if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 { - panic("encountered unknown chunk desc offset in combination with positive persist watermark") - } - if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil { + if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil { return } if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil { return } - // Number of chunkDescs. - if _, err = codable.EncodeVarint(w, int64(chunksToPersist)); err != nil { + if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil { return } - for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] { - if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { + if chunksToPersist == 0 { + // Save the one placeholder chunk desc for a fully persisted series. + chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1] + if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil { return } - if err = chunkDesc.C.Marshal(w); err != nil { + lt, err := chunkDesc.LastTime() + if err != nil { return } - p.checkpointChunksWritten.Observe(float64(chunksToPersist)) + if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { + return + } + } else { + // Save (only) the non-persisted chunks. + for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] { + if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { + return + } + if err = chunkDesc.C.Marshal(w); err != nil { + return + } + p.checkpointChunksWritten.Observe(float64(chunksToPersist)) + } } // Series is checkpointed now, so declare it clean. In case the entire // checkpoint fails later on, this is fine, as the storage's series diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index e88b17592..f881dd4e6 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -484,7 +484,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin s1.add(model.SamplePair{Timestamp: 1, Value: 3.14}) s3.add(model.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true - s3.persistWatermark = 1 + // Create another chunk in s3. + s3.add(model.SamplePair{Timestamp: 3, Value: 1.4}) + s3.headChunkClosed = true + s3.persistWatermark = 2 for i := 0; i < 10000; i++ { s4.add(model.SamplePair{ Timestamp: model.Time(i), @@ -512,8 +515,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin if err != nil { t.Fatal(err) } - if loadedSM.length() != 3 { - t.Errorf("want 3 series in map, got %d", loadedSM.length()) + if loadedSM.length() != 4 { + t.Errorf("want 4 series in map, got %d", loadedSM.length()) } if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS1.metric, m1) { @@ -537,6 +540,28 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin } else { t.Errorf("couldn't find %v in loaded map", m1) } + if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok { + if !reflect.DeepEqual(loadedS3.metric, m3) { + t.Errorf("want metric %v, got %v", m3, loadedS3.metric) + } + if loadedS3.head().C != nil { + t.Error("head chunk not evicted") + } + if loadedS3.chunkDescsOffset != 1 { + t.Errorf("want chunkDescsOffset 1, got %d", loadedS3.chunkDescsOffset) + } + if !loadedS3.headChunkClosed { + t.Error("headChunkClosed is false") + } + if loadedS3.head().ChunkFirstTime != 3 { + t.Errorf("want ChunkFirstTime in head chunk to be 3, got %d", loadedS3.head().ChunkFirstTime) + } + if loadedS3.head().ChunkLastTime != 3 { + t.Errorf("want ChunkLastTime in head chunk to be 3, got %d", loadedS3.head().ChunkLastTime) + } + } else { + t.Errorf("couldn't find %v in loaded map", m3) + } if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok { if !reflect.DeepEqual(loadedS4.metric, m4) { t.Errorf("want metric %v, got %v", m4, loadedS4.metric)