Merge pull request #265 from prometheus/feature/memory-arena-simplification

Move In-Memory Arena Away from Skiplist
This commit is contained in:
Matt T. Proud 2013-05-22 09:10:52 -07:00
commit 556be84c73
7 changed files with 219 additions and 290 deletions

View file

@ -21,7 +21,7 @@ advice:
go tool vet . go tool vet .
binary: build binary: build
go build $(BUILDFLAGS) . go build -o prometheus $(BUILDFLAGS) .
build: preparation config model tools web build: preparation config model tools web

View file

@ -50,9 +50,6 @@ type MetricPersistence interface {
GetValueAtTime(*model.Fingerprint, time.Time) model.Values GetValueAtTime(*model.Fingerprint, time.Time) model.Values
GetBoundaryValues(*model.Fingerprint, model.Interval) (first model.Values, second model.Values) GetBoundaryValues(*model.Fingerprint, model.Interval) (first model.Values, second model.Values)
GetRangeValues(*model.Fingerprint, model.Interval) model.Values GetRangeValues(*model.Fingerprint, model.Interval) model.Values
ForEachSample(IteratorsForFingerprintBuilder) (err error)
// Get all label values that are associated with a given label name. // Get all label values that are associated with a given label name.
GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) GetAllValuesForLabel(model.LabelName) (model.LabelValues, error)

View file

@ -863,10 +863,6 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName model.LabelNam
return return
} }
func (l *LevelDBMetricPersistence) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) {
panic("not implemented")
}
// CompactKeyspace compacts each database's keyspace serially. // CompactKeyspace compacts each database's keyspace serially.
// //
// Beware that it would probably be imprudent to run this on a live user-facing // Beware that it would probably be imprudent to run this on a live user-facing

View file

@ -15,12 +15,18 @@ package metric
import ( import (
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
"github.com/ryszard/goskiplist/skiplist" "sort"
"sync"
"time" "time"
) )
const (
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of
// storage per metric without any major reallocations.
initialSeriesArenaSize = 4 * 60
)
// Models a given sample entry stored in the in-memory arena. // Models a given sample entry stored in the in-memory arena.
type value interface { type value interface {
// Gets the given value. // Gets the given value.
@ -35,69 +41,99 @@ func (v singletonValue) get() model.SampleValue {
return model.SampleValue(v) return model.SampleValue(v)
} }
type skipListTime time.Time
func (t skipListTime) LessThan(o skiplist.Ordered) bool {
return time.Time(o.(skipListTime)).Before(time.Time(t))
}
type stream struct { type stream struct {
sync.RWMutex
metric model.Metric metric model.Metric
values *skiplist.SkipList values model.Values
} }
func (s *stream) add(timestamp time.Time, value model.SampleValue) { func (s *stream) add(timestamp time.Time, value model.SampleValue) {
s.values.Set(skipListTime(timestamp), singletonValue(value)) s.Lock()
defer s.Unlock()
// BUG(all): https://github.com/prometheus/prometheus/pull/265/files#r4336435.
s.values = append(s.values, model.SamplePair{
Timestamp: timestamp,
Value: value,
})
} }
func (s *stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { func (s *stream) clone() model.Values {
if s.values.Len() == 0 { s.RLock()
return false, nil defer s.RUnlock()
// BUG(all): Examine COW technique.
clone := make(model.Values, len(s.values))
copy(clone, s.values)
return clone
}
func (s *stream) getValueAtTime(t time.Time) model.Values {
s.RLock()
defer s.RUnlock()
// BUG(all): May be avenues for simplification.
l := len(s.values)
switch l {
case 0:
return model.Values{}
case 1:
return model.Values{s.values[0]}
default:
index := sort.Search(l, func(i int) bool {
return !s.values[i].Timestamp.Before(t)
})
if index == 0 {
return model.Values{s.values[0]}
}
if index == l {
return model.Values{s.values[l-1]}
}
if s.values[index].Timestamp.Equal(t) {
return model.Values{s.values[index]}
}
return model.Values{s.values[index-1], s.values[index]}
} }
iterator := s.values.SeekToLast() }
defer iterator.Close() func (s *stream) getBoundaryValues(i model.Interval) (model.Values, model.Values) {
return s.getValueAtTime(i.OldestInclusive), s.getValueAtTime(i.NewestInclusive)
}
for !(iterator.Key() == nil || iterator.Value() == nil) { func (s *stream) getRangeValues(in model.Interval) model.Values {
decodedKey, decodeErr := decoder.DecodeKey(iterator.Key()) s.RLock()
if decodeErr != nil { defer s.RUnlock()
panic(decodeErr)
}
decodedValue, decodeErr := decoder.DecodeValue(iterator.Value())
if decodeErr != nil {
panic(decodeErr)
}
switch filter.Filter(decodedKey, decodedValue) { oldest := sort.Search(len(s.values), func(i int) bool {
case storage.STOP: return !s.values[i].Timestamp.Before(in.OldestInclusive)
return false, nil })
case storage.SKIP:
continue
case storage.ACCEPT:
opErr := operator.Operate(decodedKey, decodedValue)
if opErr != nil {
if opErr.Continuable {
continue
}
break
}
}
if !iterator.Previous() {
break
}
}
return true, nil newest := sort.Search(len(s.values), func(i int) bool {
return s.values[i].Timestamp.After(in.NewestInclusive)
})
result := make(model.Values, newest-oldest)
copy(result, s.values[oldest:newest])
return result
} }
func newStream(metric model.Metric) *stream { func newStream(metric model.Metric) *stream {
return &stream{ return &stream{
values: skiplist.New(),
metric: metric, metric: metric,
values: make(model.Values, 0, initialSeriesArenaSize),
} }
} }
type memorySeriesStorage struct { type memorySeriesStorage struct {
sync.RWMutex
fingerprintToSeries map[model.Fingerprint]*stream fingerprintToSeries map[model.Fingerprint]*stream
labelPairToFingerprints map[model.LabelPair]model.Fingerprints labelPairToFingerprints map[model.LabelPair]model.Fingerprints
labelNameToFingerprints map[model.LabelName]model.Fingerprints labelNameToFingerprints map[model.LabelName]model.Fingerprints
@ -114,10 +150,13 @@ func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error {
func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
metric := sample.Metric metric := sample.Metric
fingerprint := model.NewFingerprintFromMetric(metric) fingerprint := model.NewFingerprintFromMetric(metric)
s.RLock()
series, ok := s.fingerprintToSeries[*fingerprint] series, ok := s.fingerprintToSeries[*fingerprint]
s.RUnlock()
if !ok { if !ok {
series = newStream(metric) series = newStream(metric)
s.Lock()
s.fingerprintToSeries[*fingerprint] = series s.fingerprintToSeries[*fingerprint] = series
for k, v := range metric { for k, v := range metric {
@ -133,6 +172,8 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
labelNameValues = append(labelNameValues, fingerprint) labelNameValues = append(labelNameValues, fingerprint)
s.labelNameToFingerprints[k] = labelNameValues s.labelNameToFingerprints[k] = labelNameValues
} }
s.Unlock()
} }
series.add(sample.Timestamp, sample.Value) series.add(sample.Timestamp, sample.Value)
@ -143,20 +184,24 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
// Append raw sample, bypassing indexing. Only used to add data to views, which // Append raw sample, bypassing indexing. Only used to add data to views, which
// don't need to lookup by metric. // don't need to lookup by metric.
func (s *memorySeriesStorage) appendSampleWithoutIndexing(f *model.Fingerprint, timestamp time.Time, value model.SampleValue) { func (s *memorySeriesStorage) appendSampleWithoutIndexing(f *model.Fingerprint, timestamp time.Time, value model.SampleValue) {
s.RLock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
series = newStream(model.Metric{}) series = newStream(model.Metric{})
s.Lock()
s.fingerprintToSeries[*f] = series s.fingerprintToSeries[*f] = series
s.Unlock()
} }
series.add(timestamp, value) series.add(timestamp, value)
} }
func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) { func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) {
sets := []utility.Set{} sets := []utility.Set{}
s.RLock()
for k, v := range l { for k, v := range l {
values := s.labelPairToFingerprints[model.LabelPair{ values := s.labelPairToFingerprints[model.LabelPair{
Name: k, Name: k,
@ -168,6 +213,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
} }
sets = append(sets, set) sets = append(sets, set)
} }
s.RUnlock()
setCount := len(sets) setCount := len(sets)
if setCount == 0 { if setCount == 0 {
@ -186,16 +232,24 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
return fingerprints, nil return fingerprints, nil
} }
func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (fingerprints model.Fingerprints, _ error) { func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (model.Fingerprints, error) {
values := s.labelNameToFingerprints[l] s.RLock()
defer s.RUnlock()
values, ok := s.labelNameToFingerprints[l]
if !ok {
return nil, nil
}
fingerprints = append(fingerprints, values...) fingerprints := make(model.Fingerprints, len(values))
copy(fingerprints, values)
return fingerprints, nil return fingerprints, nil
} }
func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) { func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) {
s.RLock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return nil, nil return nil, nil
} }
@ -208,91 +262,49 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (mod
return metric, nil return metric, nil
} }
func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) (samples model.Values) { func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values {
s.RLock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return samples return nil
} }
iterator := series.values.Seek(skipListTime(t)) return series.clone()
if iterator == nil { }
// If the iterator is nil, it means we seeked past the end of the series,
// so we seek to the last value instead. Due to the reverse ordering func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values {
// defined on skipListTime, this corresponds to the sample with the s.RLock()
// earliest timestamp. series, ok := s.fingerprintToSeries[*f]
iterator = series.values.SeekToLast() s.RUnlock()
if iterator == nil { if !ok {
// The list is empty. return nil
return samples
}
} }
defer iterator.Close() return series.getValueAtTime(t)
if iterator.Key() == nil || iterator.Value() == nil {
return samples
}
foundTime := time.Time(iterator.Key().(skipListTime))
samples = append(samples, model.SamplePair{
Timestamp: foundTime,
Value: iterator.Value().(value).get(),
})
if foundTime.Before(t) && iterator.Previous() {
samples = append(samples, model.SamplePair{
Timestamp: time.Time(iterator.Key().(skipListTime)),
Value: iterator.Value().(value).get(),
})
}
return samples
} }
func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.Interval) (model.Values, model.Values) { func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.Interval) (model.Values, model.Values) {
return s.GetValueAtTime(f, i.OldestInclusive), s.GetValueAtTime(f, i.NewestInclusive) s.RLock()
series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok {
return nil, nil
}
return series.getBoundaryValues(i)
} }
func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) (samples model.Values) { func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) model.Values {
s.RLock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return samples return nil
} }
iterator := series.values.Seek(skipListTime(i.OldestInclusive)) return series.getRangeValues(i)
if iterator == nil {
// If the iterator is nil, it means we seeked past the end of the series,
// so we seek to the last value instead. Due to the reverse ordering
// defined on skipListTime, this corresponds to the sample with the
// earliest timestamp.
iterator = series.values.SeekToLast()
if iterator == nil {
// The list is empty.
return samples
}
}
defer iterator.Close()
for {
timestamp := time.Time(iterator.Key().(skipListTime))
if timestamp.After(i.NewestInclusive) {
break
}
if !timestamp.Before(i.OldestInclusive) {
samples = append(samples, model.SamplePair{
Value: iterator.Value().(value).get(),
Timestamp: timestamp,
})
}
if !iterator.Previous() {
break
}
}
return samples
} }
func (s *memorySeriesStorage) Close() { func (s *memorySeriesStorage) Close() {
@ -314,16 +326,6 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (v
return return
} }
func (s *memorySeriesStorage) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) {
for _, stream := range s.fingerprintToSeries {
decoder, filter, operator := builder.ForStream(stream)
stream.forEach(decoder, filter, operator)
}
return
}
func NewMemorySeriesStorage() *memorySeriesStorage { func NewMemorySeriesStorage() *memorySeriesStorage {
return &memorySeriesStorage{ return &memorySeriesStorage{
fingerprintToSeries: make(map[model.Fingerprint]*stream), fingerprintToSeries: make(map[model.Fingerprint]*stream),

View file

@ -339,11 +339,11 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer
actual := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time) actual := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time)
if len(behavior.output) != len(actual) { if len(behavior.output) != len(actual) {
t.Fatalf("%d.%d(%s). Expected %d samples but got: %v\n", i, j, behavior.name, len(behavior.output), actual) t.Fatalf("%d.%d(%s.%s). Expected %d samples but got: %v\n", i, j, context.name, behavior.name, len(behavior.output), actual)
} }
for k, samplePair := range actual { for k, samplePair := range actual {
if samplePair.Value != behavior.output[k] { if samplePair.Value != behavior.output[k] {
t.Fatalf("%d.%d.%d(%s). Expected %s but got %s\n", i, j, k, behavior.name, behavior.output[k], samplePair) t.Fatalf("%d.%d.%d(%s.%s). Expected %s but got %s\n", i, j, k, context.name, behavior.name, behavior.output[k], samplePair)
} }
} }

View file

@ -22,6 +22,7 @@ import (
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
"math" "math"
"math/rand" "math/rand"
"sort"
"testing" "testing"
"testing/quick" "testing/quick"
"time" "time"
@ -223,6 +224,20 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *model.Fingerprint, i
return return
} }
type timeslice []time.Time
func (t timeslice) Len() int {
return len(t)
}
func (t timeslice) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}
func (t timeslice) Less(i, j int) bool {
return t[i].Before(t[j])
}
func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) { func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) {
stochastic := func(x int) (success bool) { stochastic := func(x int) (success bool) {
p, closer := persistenceMaker() p, closer := persistenceMaker()
@ -266,11 +281,9 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
timestamps := map[int64]bool{} timestamps := map[int64]bool{}
metricTimestamps[metricIndex] = timestamps metricTimestamps[metricIndex] = timestamps
var ( var newestSample int64 = math.MinInt64
newestSample int64 = math.MinInt64 var oldestSample int64 = math.MaxInt64
oldestSample int64 = math.MaxInt64 var nextTimestamp func() int64
nextTimestamp func() int64
)
nextTimestamp = func() int64 { nextTimestamp = func() int64 {
var candidate int64 var candidate int64
@ -294,8 +307,15 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
return candidate return candidate
} }
// BUG(matt): Invariant of the in-memory database assumes this.
sortedTimestamps := timeslice{}
for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ { for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ {
sample.Timestamp = time.Unix(nextTimestamp(), 0) sortedTimestamps = append(sortedTimestamps, time.Unix(nextTimestamp(), 0))
}
sort.Sort(sortedTimestamps)
for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ {
sample.Timestamp = sortedTimestamps[sampleIndex]
sample.Value = model.SampleValue(sampleIndex) sample.Value = model.SampleValue(sampleIndex)
err := p.AppendSample(sample) err := p.AppendSample(sample)

View file

@ -19,7 +19,6 @@ import (
"github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"log" "log"
"sort" "sort"
@ -65,13 +64,6 @@ type TieredStorage struct {
memoryTTL time.Duration memoryTTL time.Duration
flushMemoryInterval time.Duration flushMemoryInterval time.Duration
// This mutex manages any concurrent reads/writes of the memoryArena.
memoryMutex sync.RWMutex
// This mutex blocks only deletions from the memoryArena. It is held for a
// potentially long time for an entire renderView() duration, since we depend
// on no samples being removed from memory after grabbing a LevelDB snapshot.
memoryDeleteMutex sync.RWMutex
viewQueue chan viewJob viewQueue chan viewJob
draining chan chan bool draining chan chan bool
@ -111,9 +103,7 @@ func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) {
return fmt.Errorf("Storage is in the process of draining.") return fmt.Errorf("Storage is in the process of draining.")
} }
t.memoryMutex.Lock()
t.memoryArena.AppendSamples(samples) t.memoryArena.AppendSamples(samples)
t.memoryMutex.Unlock()
return return
} }
@ -130,10 +120,9 @@ func (t *TieredStorage) Drain() {
} }
// Enqueues a ViewRequestBuilder for materialization, subject to a timeout. // Enqueues a ViewRequestBuilder for materialization, subject to a timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) { func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (View, error) {
if len(t.draining) > 0 { if len(t.draining) > 0 {
err = fmt.Errorf("Storage is in the process of draining.") return nil, fmt.Errorf("Storage is in the process of draining.")
return
} }
// The result channel needs a one-element buffer in case we have timed out in // The result channel needs a one-element buffer in case we have timed out in
@ -152,16 +141,14 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
} }
select { select {
case value := <-result: case view := <-result:
view = value return view, nil
case err = <-errChan: case err := <-errChan:
return return nil, err
case <-time.After(deadline): case <-time.After(deadline):
abortChan <- true abortChan <- true
err = fmt.Errorf("MakeView timed out after %s.", deadline) return nil, fmt.Errorf("MakeView timed out after %s.", deadline)
} }
return
} }
func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
@ -195,7 +182,7 @@ func (t *TieredStorage) Serve() {
for { for {
select { select {
case <-flushMemoryTicker.C: case <-flushMemoryTicker.C:
t.flushMemory() t.flushMemory(t.memoryTTL)
case viewRequest := <-t.viewQueue: case viewRequest := <-t.viewQueue:
t.renderView(viewRequest) t.renderView(viewRequest)
case drainingDone := <-t.draining: case drainingDone := <-t.draining:
@ -215,7 +202,56 @@ func (t *TieredStorage) reportQueues() {
} }
func (t *TieredStorage) Flush() { func (t *TieredStorage) Flush() {
t.flushMemory() t.flushMemory(0)
}
func (t *TieredStorage) flushMemory(ttl time.Duration) {
t.memoryArena.RLock()
defer t.memoryArena.RUnlock()
cutOff := time.Now().Add(-1 * ttl)
log.Println("Flushing...")
for _, stream := range t.memoryArena.fingerprintToSeries {
finder := func(i int) bool {
return !cutOff.After(stream.values[i].Timestamp)
}
stream.Lock()
i := sort.Search(len(stream.values), finder)
toArchive := stream.values[i:]
toKeep := stream.values[:i]
queued := make(model.Samples, 0, len(toArchive))
for _, value := range toArchive {
queued = append(queued, model.Sample{
Metric: stream.metric,
Timestamp: value.Timestamp,
Value: value.Value,
})
}
t.appendToDiskQueue <- queued
stream.values = toKeep
stream.Unlock()
}
queueLength := len(t.appendToDiskQueue)
if queueLength > 0 {
log.Printf("Writing %d samples ...", queueLength)
samples := model.Samples{}
for i := 0; i < queueLength; i++ {
chunk := <-t.appendToDiskQueue
samples = append(samples, chunk...)
}
t.DiskStorage.AppendSamples(samples)
}
log.Println("Done flushing...")
} }
func (t *TieredStorage) Close() { func (t *TieredStorage) Close() {
@ -229,118 +265,6 @@ func (t *TieredStorage) Close() {
log.Println("Done.") log.Println("Done.")
} }
type memoryToDiskFlusher struct {
toDiskQueue chan model.Samples
disk MetricPersistence
olderThan time.Time
valuesAccepted int
valuesRejected int
memoryDeleteMutex *sync.RWMutex
}
type memoryToDiskFlusherVisitor struct {
stream *stream
flusher *memoryToDiskFlusher
memoryDeleteMutex *sync.RWMutex
}
func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) {
out = time.Time(in.(skipListTime))
return
}
func (f memoryToDiskFlusherVisitor) DecodeValue(in interface{}) (out interface{}, err error) {
out = in.(value).get()
return
}
func (f memoryToDiskFlusherVisitor) Filter(key, value interface{}) (filterResult storage.FilterResult) {
var (
recordTime = key.(time.Time)
)
if recordTime.Before(f.flusher.olderThan) {
f.flusher.valuesAccepted++
return storage.ACCEPT
}
f.flusher.valuesRejected++
return storage.STOP
}
func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storage.OperatorError) {
var (
recordTime = key.(time.Time)
recordValue = value.(model.SampleValue)
)
if len(f.flusher.toDiskQueue) == cap(f.flusher.toDiskQueue) {
f.flusher.Flush()
}
f.flusher.toDiskQueue <- model.Samples{
model.Sample{
Metric: f.stream.metric,
Timestamp: recordTime,
Value: recordValue,
},
}
f.memoryDeleteMutex.Lock()
f.stream.values.Delete(skipListTime(recordTime))
f.memoryDeleteMutex.Unlock()
return
}
func (f *memoryToDiskFlusher) ForStream(stream *stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) {
visitor := memoryToDiskFlusherVisitor{
stream: stream,
flusher: f,
memoryDeleteMutex: f.memoryDeleteMutex,
}
return visitor, visitor, visitor
}
func (f *memoryToDiskFlusher) Flush() {
length := len(f.toDiskQueue)
samples := model.Samples{}
for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue...)
}
f.disk.AppendSamples(samples)
}
func (f memoryToDiskFlusher) Close() {
f.Flush()
}
// Persist a whole bunch of samples from memory to the datastore.
func (t *TieredStorage) flushMemory() {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure})
}()
t.memoryMutex.RLock()
defer t.memoryMutex.RUnlock()
flusher := &memoryToDiskFlusher{
disk: t.DiskStorage,
olderThan: time.Now().Add(-1 * t.memoryTTL),
toDiskQueue: t.appendToDiskQueue,
memoryDeleteMutex: &t.memoryDeleteMutex,
}
defer flusher.Close()
t.memoryArena.ForEachSample(flusher)
return
}
func (t *TieredStorage) renderView(viewJob viewJob) { func (t *TieredStorage) renderView(viewJob viewJob) {
// Telemetry. // Telemetry.
var err error var err error
@ -351,10 +275,6 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
}() }()
// No samples may be deleted from memory while rendering a view.
t.memoryDeleteMutex.RLock()
defer t.memoryDeleteMutex.RUnlock()
scans := viewJob.builder.ScanJobs() scans := viewJob.builder.ScanJobs()
view := newView() view := newView()
// Get a single iterator that will be used for all data extraction below. // Get a single iterator that will be used for all data extraction below.
@ -378,6 +298,8 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
} }
standingOps := scanJob.operations standingOps := scanJob.operations
memValues := t.memoryArena.CloneSamples(scanJob.fingerprint)
for len(standingOps) > 0 { for len(standingOps) > 0 {
// Abort the view rendering if the caller (MakeView) has timed out. // Abort the view rendering if the caller (MakeView) has timed out.
if len(viewJob.abort) > 0 { if len(viewJob.abort) > 0 {
@ -388,16 +310,8 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
targetTime := *standingOps[0].CurrentTime() targetTime := *standingOps[0].CurrentTime()
currentChunk := chunk{} currentChunk := chunk{}
t.memoryMutex.RLock()
memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime)
t.memoryMutex.RUnlock()
// If we aimed before the oldest value in memory, load more data from disk. // If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
// XXX: For earnest performance gains analagous to the benchmarking we
// performed, chunk should only be reloaded if it no longer contains
// the values we're looking for.
//
// To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue.
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
// If we aimed past the newest value on disk, combine it with the next value from memory. // If we aimed past the newest value on disk, combine it with the next value from memory.