diff --git a/storage/metric/helpers_test.go b/storage/metric/helpers_test.go index 8be36cdf83..2902bde51f 100644 --- a/storage/metric/helpers_test.go +++ b/storage/metric/helpers_test.go @@ -65,7 +65,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func(t test.Tester) { return func(t test.Tester) { - p := NewMemorySeriesStorage() + p := NewMemorySeriesStorage(MemorySeriesOptions{}) defer p.Close() diff --git a/storage/metric/interface_test.go b/storage/metric/interface_test.go index c3979126e0..eb7dd29c98 100644 --- a/storage/metric/interface_test.go +++ b/storage/metric/interface_test.go @@ -19,5 +19,5 @@ import ( func TestInterfaceAdherence(t *testing.T) { var _ MetricPersistence = &LevelDBMetricPersistence{} - var _ MetricPersistence = NewMemorySeriesStorage() + var _ MetricPersistence = NewMemorySeriesStorage(MemorySeriesOptions{}) } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 15d06764a0..1a4af813d3 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -151,11 +151,18 @@ func newStream(metric model.Metric) *stream { type memorySeriesStorage struct { sync.RWMutex + wmCache *WatermarkCache fingerprintToSeries map[model.Fingerprint]*stream labelPairToFingerprints map[model.LabelPair]model.Fingerprints labelNameToFingerprints map[model.LabelName]model.Fingerprints } +type MemorySeriesOptions struct { + // If provided, this WatermarkCache will be updated for any samples that are + // appended to the memorySeriesStorage. + WatermarkCache *WatermarkCache +} + func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error { for _, sample := range samples { s.AppendSample(sample) @@ -172,6 +179,10 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { fingerprint := model.NewFingerprintFromMetric(metric) series, ok := s.fingerprintToSeries[*fingerprint] + if s.wmCache != nil { + s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp}) + } + if !ok { series = newStream(metric) s.fingerprintToSeries[*fingerprint] = series @@ -355,10 +366,11 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (v return } -func NewMemorySeriesStorage() *memorySeriesStorage { +func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage { return &memorySeriesStorage{ fingerprintToSeries: make(map[model.Fingerprint]*stream), labelPairToFingerprints: make(map[model.LabelPair]model.Fingerprints), labelNameToFingerprints: make(map[model.LabelName]model.Fingerprints), + wmCache: o.WatermarkCache, } } diff --git a/storage/metric/memory_test.go b/storage/metric/memory_test.go index 34514826dc..402c47dd52 100644 --- a/storage/metric/memory_test.go +++ b/storage/metric/memory_test.go @@ -48,7 +48,7 @@ func BenchmarkStreamAdd(b *testing.B) { func benchmarkAppendSample(b *testing.B, labels int) { b.StopTimer() - s := NewMemorySeriesStorage() + s := NewMemorySeriesStorage(MemorySeriesOptions{}) metric := model.Metric{} diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index bd8655756f..0673c45ba3 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -896,7 +896,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer func testMemoryGetValueAtTime(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } GetValueAtTimeTests(persistenceMaker, t) @@ -924,7 +924,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) { func testMemoryGetRangeValues(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } GetRangeValuesTests(persistenceMaker, false, t) @@ -932,7 +932,7 @@ func testMemoryGetRangeValues(t test.Tester) { func testMemoryGetBoundaryValues(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } GetRangeValuesTests(persistenceMaker, true, t) diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 666be10cc7..58db623399 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -644,7 +644,7 @@ func BenchmarkMemoryAppendSampleAsPureSingleEntityAppend(b *testing.B) { func testMemoryStochastic(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } StochasticTests(persistenceMaker, t) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 6d8e451878..3f86f4ceea 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -19,6 +19,8 @@ import ( "sort" "time" + "code.google.com/p/goprotobuf/proto" + dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/coding" @@ -62,6 +64,13 @@ const ( tieredStorageStopping ) +const ( + // Ignore timeseries in queries that are more stale than this limit. + stalenessLimit = time.Minute * 5 + // Size of the watermarks cache (used in determining timeseries freshness). + wmCacheSizeBytes = 5 * 1024 * 1024 +) + // TieredStorage both persists samples and generates materialized views for // queries. type TieredStorage struct { @@ -85,6 +94,8 @@ type TieredStorage struct { memorySemaphore chan bool diskSemaphore chan bool + + wmCache *WatermarkCache } // viewJob encapsulates a request to extract sample values from the datastore. @@ -107,17 +118,22 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn return nil, err } + wmCache := NewWatermarkCache(wmCacheSizeBytes) + memOptions := MemorySeriesOptions{WatermarkCache: wmCache} + s := &TieredStorage{ appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), DiskStorage: diskStorage, draining: make(chan chan<- bool), flushMemoryInterval: flushMemoryInterval, - memoryArena: NewMemorySeriesStorage(), + memoryArena: NewMemorySeriesStorage(memOptions), memoryTTL: memoryTTL, viewQueue: make(chan viewJob, viewQueueDepth), diskSemaphore: make(chan bool, tieredDiskSemaphores), memorySemaphore: make(chan bool, tieredMemorySemaphores), + + wmCache: wmCache, } for i := 0; i < tieredDiskSemaphores; i++ { @@ -315,10 +331,38 @@ func (t *TieredStorage) Close() { // get flushed. close(t.appendToDiskQueue) close(t.viewQueue) + t.wmCache.Clear() t.state = tieredStorageStopping } +func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, error) { + // BUG(julius): Make this configurable by query layer. + i = i.Add(-stalenessLimit) + + wm, ok := t.wmCache.Get(f) + if !ok { + rowKey := coding.NewPBEncoder(f.ToDTO()) + raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey) + if err != nil { + return false, err + } + if raw != nil { + value := &dto.MetricHighWatermark{} + err = proto.Unmarshal(raw, value) + if err != nil { + return false, err + } + + wmTime := time.Unix(*value.Timestamp, 0).UTC() + t.wmCache.Set(f, &Watermarks{High: wmTime}) + return wmTime.Before(i), nil + } + return true, nil + } + return wm.High.Before(i), nil +} + func (t *TieredStorage) renderView(viewJob viewJob) { // Telemetry. var err error @@ -342,6 +386,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) { extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start() for _, scanJob := range scans { + old, err := t.seriesTooOld(scanJob.fingerprint, *scanJob.operations[0].CurrentTime()) + if err != nil { + log.Printf("Error getting watermark from cache for %s: %s", scanJob.fingerprint, err) + continue + } + if old { + continue + } + var seriesFrontier *seriesFrontier = nil var seriesPresent = true diff --git a/storage/metric/view.go b/storage/metric/view.go index 1330e6eac9..93fc45d9c7 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -110,5 +110,5 @@ func (v view) appendSamples(fingerprint *model.Fingerprint, samples model.Values } func newView() view { - return view{NewMemorySeriesStorage()} + return view{NewMemorySeriesStorage(MemorySeriesOptions{})} }