mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Limit retrievable samples to retention window.
The storage does not delete data immediately after the retention period. We don't want to retrieve this data as it causes artifacts.
This commit is contained in:
parent
550ed0ea09
commit
aff01e29c3
|
@ -44,6 +44,8 @@ type Storage interface {
|
|||
// Get the metric associated with the provided fingerprint.
|
||||
MetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric
|
||||
// Construct an iterator for a given fingerprint.
|
||||
// The iterator will never return samples older than retention time,
|
||||
// relative to the time NewIterator was called.
|
||||
NewIterator(clientmodel.Fingerprint) SeriesIterator
|
||||
// Run the various maintenance loops in goroutines. Returns when the
|
||||
// storage is ready to use. Keeps everything running in the background
|
||||
|
|
|
@ -286,7 +286,47 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
|
|||
// return any values.
|
||||
return nopSeriesIterator{}
|
||||
}
|
||||
return series.newIterator()
|
||||
return &boundedIterator{
|
||||
it: series.newIterator(),
|
||||
start: clientmodel.Now().Add(-s.dropAfter),
|
||||
}
|
||||
}
|
||||
|
||||
// boundedIterator wraps a SeriesIterator and does not allow fetching
|
||||
// data from earlier than the configured start time.
|
||||
type boundedIterator struct {
|
||||
it SeriesIterator
|
||||
start clientmodel.Timestamp
|
||||
}
|
||||
|
||||
// ValueAtTime implements the SeriesIterator interface.
|
||||
func (bit *boundedIterator) ValueAtTime(ts clientmodel.Timestamp) metric.Values {
|
||||
if ts < bit.start {
|
||||
return metric.Values{}
|
||||
}
|
||||
return bit.it.ValueAtTime(ts)
|
||||
}
|
||||
|
||||
// BoundaryValues implements the SeriesIterator interface.
|
||||
func (bit *boundedIterator) BoundaryValues(interval metric.Interval) metric.Values {
|
||||
if interval.NewestInclusive < bit.start {
|
||||
return metric.Values{}
|
||||
}
|
||||
if interval.OldestInclusive < bit.start {
|
||||
interval.OldestInclusive = bit.start
|
||||
}
|
||||
return bit.it.BoundaryValues(interval)
|
||||
}
|
||||
|
||||
// RangeValues implements the SeriesIterator interface.
|
||||
func (bit *boundedIterator) RangeValues(interval metric.Interval) metric.Values {
|
||||
if interval.NewestInclusive < bit.start {
|
||||
return metric.Values{}
|
||||
}
|
||||
if interval.OldestInclusive < bit.start {
|
||||
interval.OldestInclusive = bit.start
|
||||
}
|
||||
return bit.it.RangeValues(interval)
|
||||
}
|
||||
|
||||
// NewPreloader implements Storage.
|
||||
|
|
|
@ -140,6 +140,70 @@ func TestFingerprintsForLabelMatchers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRetentionCutoff(t *testing.T) {
|
||||
now := clientmodel.Now()
|
||||
insertStart := now.Add(-2 * time.Hour)
|
||||
|
||||
s, closer := NewTestStorage(t, 1)
|
||||
defer closer.Close()
|
||||
|
||||
// Stop maintenance loop to prevent actual purging.
|
||||
s.loopStopping <- struct{}{}
|
||||
|
||||
s.dropAfter = 1 * time.Hour
|
||||
|
||||
samples := make(clientmodel.Samples, 120)
|
||||
for i := range samples {
|
||||
smpl := &clientmodel.Sample{
|
||||
Metric: clientmodel.Metric{"job": "test"},
|
||||
Timestamp: insertStart.Add(time.Duration(i) * time.Minute), // 1 minute intervals.
|
||||
Value: 1,
|
||||
}
|
||||
s.Append(smpl)
|
||||
}
|
||||
s.WaitForIndexing()
|
||||
|
||||
lm, err := metric.NewLabelMatcher(metric.Equal, "job", "test")
|
||||
if err != nil {
|
||||
t.Fatalf("error creating label matcher: %s", err)
|
||||
}
|
||||
fp := s.FingerprintsForLabelMatchers(metric.LabelMatchers{lm})[0]
|
||||
|
||||
pl := s.NewPreloader()
|
||||
defer pl.Close()
|
||||
|
||||
// Preload everything.
|
||||
err = pl.PreloadRange(fp, insertStart, now, 5*time.Minute)
|
||||
if err != nil {
|
||||
t.Fatalf("Error preloading outdated chunks: %s", err)
|
||||
}
|
||||
|
||||
it := s.NewIterator(fp)
|
||||
|
||||
vals := it.ValueAtTime(now.Add(-61 * time.Minute))
|
||||
if len(vals) != 0 {
|
||||
t.Errorf("unexpected result for timestamp before retention period")
|
||||
}
|
||||
|
||||
vals = it.RangeValues(metric.Interval{insertStart, now})
|
||||
// We get 59 values here because the clientmodel.Now() is slightly later
|
||||
// than our now.
|
||||
if len(vals) != 59 {
|
||||
t.Errorf("expected 59 values but got %d", len(vals))
|
||||
}
|
||||
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
|
||||
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
|
||||
}
|
||||
|
||||
vals = it.BoundaryValues(metric.Interval{insertStart, now})
|
||||
if len(vals) != 2 {
|
||||
t.Errorf("expected 2 values but got %d", len(vals))
|
||||
}
|
||||
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
|
||||
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoop is just a smoke test for the loop method, if we can switch it on and
|
||||
// off without disaster.
|
||||
func TestLoop(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue