From daeccdd0e94f4265a31be0c9576bb449de475b6c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 11 Sep 2015 15:47:23 +0200 Subject: [PATCH] Fix DropMetricsForFingerprints It now deletes the series file also for archived series. Also, fix a naming error in a doc comment. --- storage/local/persistence.go | 2 +- storage/local/storage.go | 7 ++-- storage/local/storage_test.go | 73 +++++++++++++++++++++++++---------- 3 files changed, 58 insertions(+), 24 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 2dda63a77..5eaf0b13d 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1059,7 +1059,7 @@ func (p *persistence) indexMetric(fp model.Fingerprint, m model.Metric) { // indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and // fingerprintsModifiedBefore. The index of fingerprints to archived metrics is // not affected by this removal. (In fact, never call this method for an -// archived metric. To purge an archived metric, call purgeArchivedFingerprint.) +// archived metric. To purge an archived metric, call purgeArchivedMetric.) // If the queue is full, this method blocks until the metric can be queued. This // method is goroutine-safe. func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) { diff --git a/storage/local/storage.go b/storage/local/storage.go index 11581af49..41c55be13 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -518,12 +518,13 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin s.fpToSeries.del(fp) s.numSeries.Dec() s.persistence.unindexMetric(fp, series.metric) - if _, err := s.persistence.deleteSeriesFile(fp); err != nil { - log.Errorf("Error deleting series file for %v: %v", fp, err) - } } else if err := s.persistence.purgeArchivedMetric(fp); err != nil { log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) } + // Attempt to delete series file in any case. + if _, err := s.persistence.deleteSeriesFile(fp); err != nil { + log.Errorf("Error deleting series file for %v: %v", fp, err) + } s.fpLocker.Unlock(fp) } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index a3f11de65..a8ea0a7c3 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "math/rand" + "os" "reflect" "testing" "testing/quick" @@ -438,16 +439,29 @@ func TestDropMetrics(t *testing.T) { s, closer := NewTestStorage(t, 1) defer closer.Close() + chunkFileExists := func(fp model.Fingerprint) (bool, error) { + f, err := s.persistence.openChunkFileForReading(fp) + if err == nil { + f.Close() + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"} m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} + m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} N := 120000 - for j, m := range []model.Metric{m1, m2} { + for j, m := range []model.Metric{m1, m2, m3} { for i := 0; i < N; i++ { smpl := &model.Sample{ Metric: m, - Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 minute intervals. + Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals. Value: model.SampleValue(j), } s.Append(smpl) @@ -455,19 +469,24 @@ func TestDropMetrics(t *testing.T) { } s.WaitForIndexing() + // Archive m3, but first maintain it so that at least something is written to disk. + fpToBeArchived := m3.FastFingerprint() + s.maintainMemorySeries(fpToBeArchived, 0) + s.fpLocker.Lock(fpToBeArchived) + s.fpToSeries.del(fpToBeArchived) + if err := s.persistence.archiveMetric( + fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), + ); err != nil { + t.Error(err) + } + s.fpLocker.Unlock(fpToBeArchived) + fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) - if len(fps) != 2 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps)) + if len(fps) != 3 { + t.Errorf("unexpected number of fingerprints: %d", len(fps)) } - var fpList model.Fingerprints - for fp := range fps { - it := s.NewIterator(fp) - if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { - t.Fatalf("unexpected number of samples: %d", len(vals)) - } - fpList = append(fpList, fp) - } + fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived} s.DropMetricsForFingerprints(fpList[0]) s.WaitForIndexing() @@ -475,17 +494,24 @@ func TestDropMetrics(t *testing.T) { fps2 := s.fingerprintsForLabelPairs(model.LabelPair{ Name: model.MetricNameLabel, Value: "test", }) - if len(fps2) != 1 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) + if len(fps2) != 2 { + t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } it := s.NewIterator(fpList[0]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) + } + exists, err := chunkFileExists(fpList[2]) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Errorf("chunk file does not exist for fp=%v", fpList[2]) } s.DropMetricsForFingerprints(fpList...) @@ -495,16 +521,23 @@ func TestDropMetrics(t *testing.T) { Name: model.MetricNameLabel, Value: "test", }) if len(fps3) != 0 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) + t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } it = s.NewIterator(fpList[0]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) + } + exists, err = chunkFileExists(fpList[2]) + if err != nil { + t.Fatal(err) + } + if exists { + t.Errorf("chunk file still exists for fp=%v", fpList[2]) } } @@ -533,7 +566,7 @@ func TestLoop(t *testing.T) { } storage := NewMemorySeriesStorage(o) if err := storage.Start(); err != nil { - t.Fatalf("Error starting storage: %s", err) + t.Errorf("Error starting storage: %s", err) } for _, s := range samples { storage.Append(s)