diff --git a/storage/local/interface.go b/storage/local/interface.go index 0cb7e48b7..308bee8bf 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -47,6 +47,9 @@ type Storage interface { // The iterator will never return samples older than retention time, // relative to the time NewIterator was called. NewIterator(clientmodel.Fingerprint) SeriesIterator + // Drop all time series associated with the given fingerprints. This operation + // will not show up in the series operations metrics. + DropMetricsForFingerprints(...clientmodel.Fingerprint) // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background // until Stop is called. diff --git a/storage/local/storage.go b/storage/local/storage.go index 12324e081..549cb6b39 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -430,6 +430,26 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp clientmodel.Fingerprint) c } } +// DropMetric implements Storage. +func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...clientmodel.Fingerprint) { + for _, fp := range fps { + s.fpLocker.Lock(fp) + + if series, ok := s.fpToSeries.get(fp); ok { + s.fpToSeries.del(fp) + s.numSeries.Dec() + s.persistence.unindexMetric(fp, series.metric) + if _, err := s.persistence.deleteSeriesFile(fp); err != nil { + log.Errorf("Error deleting series file for %v: %v", fp, err) + } + } else if err := s.persistence.purgeArchivedMetric(fp); err != nil { + log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) + } + + s.fpLocker.Unlock(fp) + } +} + // Append implements Storage. func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) { if s.getNumChunksToPersist() >= s.maxChunksToPersist { @@ -694,7 +714,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode for { archivedFPs, err := s.persistence.fingerprintsModifiedBefore( - clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter), + clientmodel.Now().Add(-s.dropAfter), ) if err != nil { log.Error("Failed to lookup archived fingerprint ranges: ", err) @@ -750,7 +770,7 @@ loop: dirtySeriesCount = 0 checkpointTimer.Reset(s.checkpointInterval) case fp := <-memoryFingerprints: - if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) { + if s.maintainMemorySeries(fp, clientmodel.Now().Add(-s.dropAfter)) { dirtySeriesCount++ // Check if we have enough "dirty" series so that we need an early checkpoint. // However, if we are already behind persisting chunks, creating a checkpoint @@ -764,7 +784,7 @@ loop: } } case fp := <-archivedFingerprints: - s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) + s.maintainArchivedSeries(fp, clientmodel.Now().Add(-s.dropAfter)) } } // Wait until both channels are closed. diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 54c113378..959d25711 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -152,8 +152,7 @@ func TestRetentionCutoff(t *testing.T) { s.dropAfter = 1 * time.Hour - samples := make(clientmodel.Samples, 120) - for i := range samples { + for i := 0; i < 120; i++ { smpl := &clientmodel.Sample{ Metric: clientmodel.Metric{"job": "test"}, Timestamp: insertStart.Add(time.Duration(i) * time.Minute), // 1 minute intervals. @@ -204,6 +203,85 @@ func TestRetentionCutoff(t *testing.T) { } } +func TestDropMetrics(t *testing.T) { + now := clientmodel.Now() + insertStart := now.Add(-2 * time.Hour) + + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + m1 := clientmodel.Metric{clientmodel.MetricNameLabel: "test", "n1": "v1"} + m2 := clientmodel.Metric{clientmodel.MetricNameLabel: "test", "n1": "v2"} + + N := 120000 + + for j, m := range []clientmodel.Metric{m1, m2} { + for i := 0; i < N; i++ { + smpl := &clientmodel.Sample{ + Metric: m, + Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 minute intervals. + Value: clientmodel.SampleValue(j), + } + s.Append(smpl) + } + } + s.WaitForIndexing() + + matcher := metric.LabelMatchers{{ + Type: metric.Equal, + Name: clientmodel.MetricNameLabel, + Value: "test", + }} + + fps := s.FingerprintsForLabelMatchers(matcher) + if len(fps) != 2 { + t.Fatalf("unexpected number of fingerprints: %d", len(fps)) + } + + it := s.NewIterator(fps[0]) + if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { + t.Fatalf("unexpected number of samples: %d", len(vals)) + } + it = s.NewIterator(fps[1]) + if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { + t.Fatalf("unexpected number of samples: %d", len(vals)) + } + + s.DropMetricsForFingerprints(fps[0]) + s.WaitForIndexing() + + fps2 := s.FingerprintsForLabelMatchers(matcher) + if len(fps2) != 1 { + t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) + } + + it = s.NewIterator(fps[0]) + if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + t.Fatalf("unexpected number of samples: %d", len(vals)) + } + it = s.NewIterator(fps[1]) + if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N { + t.Fatalf("unexpected number of samples: %d", len(vals)) + } + + s.DropMetricsForFingerprints(fps...) + s.WaitForIndexing() + + fps3 := s.FingerprintsForLabelMatchers(matcher) + if len(fps3) != 0 { + t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) + } + + it = s.NewIterator(fps[0]) + if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + t.Fatalf("unexpected number of samples: %d", len(vals)) + } + it = s.NewIterator(fps[1]) + if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 { + t.Fatalf("unexpected number of samples: %d", len(vals)) + } +} + // TestLoop is just a smoke test for the loop method, if we can switch it on and // off without disaster. func TestLoop(t *testing.T) {