Merge pull request #738 from prometheus/fabxc/retention-cutoff

Limit retrievable samples to retention window.
This commit is contained in:
Fabian Reinartz 2015-05-27 13:26:48 +02:00
commit 078efa6e6a
3 changed files with 107 additions and 1 deletions

View file

@ -44,6 +44,8 @@ type Storage interface {
// Get the metric associated with the provided fingerprint.
MetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric
// Construct an iterator for a given fingerprint.
// The iterator will never return samples older than retention time,
// relative to the time NewIterator was called.
NewIterator(clientmodel.Fingerprint) SeriesIterator
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background

View file

@ -286,7 +286,47 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
// return any values.
return nopSeriesIterator{}
}
return series.newIterator()
return &boundedIterator{
it: series.newIterator(),
start: clientmodel.Now().Add(-s.dropAfter),
}
}
// boundedIterator wraps a SeriesIterator and does not allow fetching
// data from earlier than the configured start time.
type boundedIterator struct {
it SeriesIterator
start clientmodel.Timestamp
}
// ValueAtTime implements the SeriesIterator interface.
func (bit *boundedIterator) ValueAtTime(ts clientmodel.Timestamp) metric.Values {
if ts < bit.start {
return metric.Values{}
}
return bit.it.ValueAtTime(ts)
}
// BoundaryValues implements the SeriesIterator interface.
func (bit *boundedIterator) BoundaryValues(interval metric.Interval) metric.Values {
if interval.NewestInclusive < bit.start {
return metric.Values{}
}
if interval.OldestInclusive < bit.start {
interval.OldestInclusive = bit.start
}
return bit.it.BoundaryValues(interval)
}
// RangeValues implements the SeriesIterator interface.
func (bit *boundedIterator) RangeValues(interval metric.Interval) metric.Values {
if interval.NewestInclusive < bit.start {
return metric.Values{}
}
if interval.OldestInclusive < bit.start {
interval.OldestInclusive = bit.start
}
return bit.it.RangeValues(interval)
}
// NewPreloader implements Storage.

View file

@ -140,6 +140,70 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
}
}
func TestRetentionCutoff(t *testing.T) {
now := clientmodel.Now()
insertStart := now.Add(-2 * time.Hour)
s, closer := NewTestStorage(t, 1)
defer closer.Close()
// Stop maintenance loop to prevent actual purging.
s.loopStopping <- struct{}{}
s.dropAfter = 1 * time.Hour
samples := make(clientmodel.Samples, 120)
for i := range samples {
smpl := &clientmodel.Sample{
Metric: clientmodel.Metric{"job": "test"},
Timestamp: insertStart.Add(time.Duration(i) * time.Minute), // 1 minute intervals.
Value: 1,
}
s.Append(smpl)
}
s.WaitForIndexing()
lm, err := metric.NewLabelMatcher(metric.Equal, "job", "test")
if err != nil {
t.Fatalf("error creating label matcher: %s", err)
}
fp := s.FingerprintsForLabelMatchers(metric.LabelMatchers{lm})[0]
pl := s.NewPreloader()
defer pl.Close()
// Preload everything.
err = pl.PreloadRange(fp, insertStart, now, 5*time.Minute)
if err != nil {
t.Fatalf("Error preloading outdated chunks: %s", err)
}
it := s.NewIterator(fp)
vals := it.ValueAtTime(now.Add(-61 * time.Minute))
if len(vals) != 0 {
t.Errorf("unexpected result for timestamp before retention period")
}
vals = it.RangeValues(metric.Interval{insertStart, now})
// We get 59 values here because the clientmodel.Now() is slightly later
// than our now.
if len(vals) != 59 {
t.Errorf("expected 59 values but got %d", len(vals))
}
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
}
vals = it.BoundaryValues(metric.Interval{insertStart, now})
if len(vals) != 2 {
t.Errorf("expected 2 values but got %d", len(vals))
}
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
}
}
// TestLoop is just a smoke test for the loop method, if we can switch it on and
// off without disaster.
func TestLoop(t *testing.T) {