Use LRU cache to avoid querying stale series.

This commit is contained in:
Julius Volz 2013-06-06 18:16:22 +02:00 committed by Julius Volz
parent f98853d7b7
commit 84741b227d
8 changed files with 75 additions and 10 deletions

View file

@ -65,7 +65,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test
func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func(t test.Tester) {
return func(t test.Tester) {
p := NewMemorySeriesStorage()
p := NewMemorySeriesStorage(MemorySeriesOptions{})
defer p.Close()

View file

@ -19,5 +19,5 @@ import (
func TestInterfaceAdherence(t *testing.T) {
var _ MetricPersistence = &LevelDBMetricPersistence{}
var _ MetricPersistence = NewMemorySeriesStorage()
var _ MetricPersistence = NewMemorySeriesStorage(MemorySeriesOptions{})
}

View file

@ -151,11 +151,18 @@ func newStream(metric model.Metric) *stream {
type memorySeriesStorage struct {
sync.RWMutex
wmCache *WatermarkCache
fingerprintToSeries map[model.Fingerprint]*stream
labelPairToFingerprints map[model.LabelPair]model.Fingerprints
labelNameToFingerprints map[model.LabelName]model.Fingerprints
}
type MemorySeriesOptions struct {
// If provided, this WatermarkCache will be updated for any samples that are
// appended to the memorySeriesStorage.
WatermarkCache *WatermarkCache
}
func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error {
for _, sample := range samples {
s.AppendSample(sample)
@ -172,6 +179,10 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
fingerprint := model.NewFingerprintFromMetric(metric)
series, ok := s.fingerprintToSeries[*fingerprint]
if s.wmCache != nil {
s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp})
}
if !ok {
series = newStream(metric)
s.fingerprintToSeries[*fingerprint] = series
@ -355,10 +366,11 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (v
return
}
func NewMemorySeriesStorage() *memorySeriesStorage {
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[model.Fingerprint]*stream),
labelPairToFingerprints: make(map[model.LabelPair]model.Fingerprints),
labelNameToFingerprints: make(map[model.LabelName]model.Fingerprints),
wmCache: o.WatermarkCache,
}
}

View file

@ -48,7 +48,7 @@ func BenchmarkStreamAdd(b *testing.B) {
func benchmarkAppendSample(b *testing.B, labels int) {
b.StopTimer()
s := NewMemorySeriesStorage()
s := NewMemorySeriesStorage(MemorySeriesOptions{})
metric := model.Metric{}

View file

@ -896,7 +896,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
func testMemoryGetValueAtTime(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
GetValueAtTimeTests(persistenceMaker, t)
@ -924,7 +924,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) {
func testMemoryGetRangeValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
GetRangeValuesTests(persistenceMaker, false, t)
@ -932,7 +932,7 @@ func testMemoryGetRangeValues(t test.Tester) {
func testMemoryGetBoundaryValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
GetRangeValuesTests(persistenceMaker, true, t)

View file

@ -644,7 +644,7 @@ func BenchmarkMemoryAppendSampleAsPureSingleEntityAppend(b *testing.B) {
func testMemoryStochastic(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
StochasticTests(persistenceMaker, t)

View file

@ -19,6 +19,8 @@ import (
"sort"
"time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/coding"
@ -62,6 +64,13 @@ const (
tieredStorageStopping
)
const (
// Ignore timeseries in queries that are more stale than this limit.
stalenessLimit = time.Minute * 5
// Size of the watermarks cache (used in determining timeseries freshness).
wmCacheSizeBytes = 5 * 1024 * 1024
)
// TieredStorage both persists samples and generates materialized views for
// queries.
type TieredStorage struct {
@ -85,6 +94,8 @@ type TieredStorage struct {
memorySemaphore chan bool
diskSemaphore chan bool
wmCache *WatermarkCache
}
// viewJob encapsulates a request to extract sample values from the datastore.
@ -107,17 +118,22 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
return nil, err
}
wmCache := NewWatermarkCache(wmCacheSizeBytes)
memOptions := MemorySeriesOptions{WatermarkCache: wmCache}
s := &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
DiskStorage: diskStorage,
draining: make(chan chan<- bool),
flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(),
memoryArena: NewMemorySeriesStorage(memOptions),
memoryTTL: memoryTTL,
viewQueue: make(chan viewJob, viewQueueDepth),
diskSemaphore: make(chan bool, tieredDiskSemaphores),
memorySemaphore: make(chan bool, tieredMemorySemaphores),
wmCache: wmCache,
}
for i := 0; i < tieredDiskSemaphores; i++ {
@ -315,10 +331,38 @@ func (t *TieredStorage) Close() {
// get flushed.
close(t.appendToDiskQueue)
close(t.viewQueue)
t.wmCache.Clear()
t.state = tieredStorageStopping
}
func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, error) {
// BUG(julius): Make this configurable by query layer.
i = i.Add(-stalenessLimit)
wm, ok := t.wmCache.Get(f)
if !ok {
rowKey := coding.NewPBEncoder(f.ToDTO())
raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey)
if err != nil {
return false, err
}
if raw != nil {
value := &dto.MetricHighWatermark{}
err = proto.Unmarshal(raw, value)
if err != nil {
return false, err
}
wmTime := time.Unix(*value.Timestamp, 0).UTC()
t.wmCache.Set(f, &Watermarks{High: wmTime})
return wmTime.Before(i), nil
}
return true, nil
}
return wm.High.Before(i), nil
}
func (t *TieredStorage) renderView(viewJob viewJob) {
// Telemetry.
var err error
@ -342,6 +386,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for _, scanJob := range scans {
old, err := t.seriesTooOld(scanJob.fingerprint, *scanJob.operations[0].CurrentTime())
if err != nil {
log.Printf("Error getting watermark from cache for %s: %s", scanJob.fingerprint, err)
continue
}
if old {
continue
}
var seriesFrontier *seriesFrontier = nil
var seriesPresent = true

View file

@ -110,5 +110,5 @@ func (v view) appendSamples(fingerprint *model.Fingerprint, samples model.Values
}
func newView() view {
return view{NewMemorySeriesStorage()}
return view{NewMemorySeriesStorage(MemorySeriesOptions{})}
}