Merge pull request #285 from prometheus/fix/storage/states

Add storage state guards and transition callbacks.
This commit is contained in:
Matt T. Proud 2013-06-06 03:41:18 -07:00
commit 0d46f6b42a
2 changed files with 125 additions and 53 deletions

View file

@ -21,11 +21,9 @@ import (
"time" "time"
) )
const ( // Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of // storage per metric without any major reallocations.
// storage per metric without any major reallocations. const initialSeriesArenaSize = 4 * 60
initialSeriesArenaSize = 4 * 60
)
// Models a given sample entry stored in the in-memory arena. // Models a given sample entry stored in the in-memory arena.
type value interface { type value interface {
@ -167,15 +165,15 @@ func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error {
} }
func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
s.Lock()
defer s.Unlock()
metric := sample.Metric metric := sample.Metric
fingerprint := model.NewFingerprintFromMetric(metric) fingerprint := model.NewFingerprintFromMetric(metric)
s.RLock()
series, ok := s.fingerprintToSeries[*fingerprint] series, ok := s.fingerprintToSeries[*fingerprint]
s.RUnlock()
if !ok { if !ok {
series = newStream(metric) series = newStream(metric)
s.Lock()
s.fingerprintToSeries[*fingerprint] = series s.fingerprintToSeries[*fingerprint] = series
for k, v := range metric { for k, v := range metric {
@ -191,8 +189,6 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
labelNameValues = append(labelNameValues, fingerprint) labelNameValues = append(labelNameValues, fingerprint)
s.labelNameToFingerprints[k] = labelNameValues s.labelNameToFingerprints[k] = labelNameValues
} }
s.Unlock()
} }
series.add(sample.Timestamp, sample.Value) series.add(sample.Timestamp, sample.Value)
@ -203,15 +199,14 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
// Append raw samples, bypassing indexing. Only used to add data to views, // Append raw samples, bypassing indexing. Only used to add data to views,
// which don't need to lookup by metric. // which don't need to lookup by metric.
func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *model.Fingerprint, samples model.Values) { func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *model.Fingerprint, samples model.Values) {
s.RLock() s.Lock()
defer s.Unlock()
series, ok := s.fingerprintToSeries[*fingerprint] series, ok := s.fingerprintToSeries[*fingerprint]
s.RUnlock()
if !ok { if !ok {
series = newStream(model.Metric{}) series = newStream(model.Metric{})
s.Lock()
s.fingerprintToSeries[*fingerprint] = series s.fingerprintToSeries[*fingerprint] = series
s.Unlock()
} }
for _, sample := range samples { for _, sample := range samples {
@ -220,9 +215,10 @@ func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *model.Fi
} }
func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) { func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) {
sets := []utility.Set{}
s.RLock() s.RLock()
defer s.RUnlock()
sets := []utility.Set{}
for k, v := range l { for k, v := range l {
values := s.labelPairToFingerprints[model.LabelPair{ values := s.labelPairToFingerprints[model.LabelPair{
Name: k, Name: k,
@ -234,7 +230,6 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
} }
sets = append(sets, set) sets = append(sets, set)
} }
s.RUnlock()
setCount := len(sets) setCount := len(sets)
if setCount == 0 { if setCount == 0 {
@ -256,6 +251,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (model.Fingerprints, error) { func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (model.Fingerprints, error) {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
values, ok := s.labelNameToFingerprints[l] values, ok := s.labelNameToFingerprints[l]
if !ok { if !ok {
return nil, nil return nil, nil
@ -269,8 +265,9 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (mo
func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) { func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) {
s.RLock() s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return nil, nil return nil, nil
} }
@ -285,8 +282,9 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (mod
func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values { func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values {
s.RLock() s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return nil return nil
} }
@ -296,8 +294,9 @@ func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values {
func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values { func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values {
s.RLock() s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return nil return nil
} }
@ -307,8 +306,9 @@ func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time)
func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.Interval) model.Values { func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.Interval) model.Values {
s.RLock() s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return nil return nil
} }
@ -318,8 +318,9 @@ func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.In
func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) model.Values { func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) model.Values {
s.RLock() s.RLock()
defer s.RUnlock()
series, ok := s.fingerprintToSeries[*f] series, ok := s.fingerprintToSeries[*f]
s.RUnlock()
if !ok { if !ok {
return nil return nil
@ -329,12 +330,18 @@ func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Inter
} }
func (s *memorySeriesStorage) Close() { func (s *memorySeriesStorage) Close() {
s.Lock()
defer s.Unlock()
s.fingerprintToSeries = map[model.Fingerprint]*stream{} s.fingerprintToSeries = map[model.Fingerprint]*stream{}
s.labelPairToFingerprints = map[model.LabelPair]model.Fingerprints{} s.labelPairToFingerprints = map[model.LabelPair]model.Fingerprints{}
s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{} s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{}
} }
func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
s.RLock()
defer s.RUnlock()
valueSet := map[model.LabelValue]bool{} valueSet := map[model.LabelValue]bool{}
for _, series := range s.fingerprintToSeries { for _, series := range s.fingerprintToSeries {
if value, ok := series.metric[labelName]; ok { if value, ok := series.metric[labelName]; ok {
@ -344,6 +351,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (v
} }
} }
} }
return return
} }

View file

@ -53,9 +53,20 @@ func (c chunk) TruncateBefore(t time.Time) chunk {
} }
} }
type tieredStorageState uint
const (
tieredStorageStarting tieredStorageState = iota
tieredStorageServing
tieredStorageDraining
tieredStorageStopping
)
// TieredStorage both persists samples and generates materialized views for // TieredStorage both persists samples and generates materialized views for
// queries. // queries.
type TieredStorage struct { type TieredStorage struct {
mu sync.RWMutex
// BUG(matt): This introduces a Law of Demeter violation. Ugh. // BUG(matt): This introduces a Law of Demeter violation. Ugh.
DiskStorage *LevelDBMetricPersistence DiskStorage *LevelDBMetricPersistence
@ -67,9 +78,9 @@ type TieredStorage struct {
viewQueue chan viewJob viewQueue chan viewJob
draining chan chan bool draining chan chan<- bool
mutex sync.Mutex state tieredStorageState
} }
// viewJob encapsulates a request to extract sample values from the datastore. // viewJob encapsulates a request to extract sample values from the datastore.
@ -90,7 +101,7 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
storage = &TieredStorage{ storage = &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
DiskStorage: diskStorage, DiskStorage: diskStorage,
draining: make(chan chan bool), draining: make(chan chan<- bool),
flushMemoryInterval: flushMemoryInterval, flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(), memoryArena: NewMemorySeriesStorage(),
memoryTTL: memoryTTL, memoryTTL: memoryTTL,
@ -101,8 +112,10 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
// Enqueues Samples for storage. // Enqueues Samples for storage.
func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) { func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) {
if len(t.draining) > 0 { t.mu.RLock()
return fmt.Errorf("Storage is in the process of draining.") defer t.mu.RUnlock()
if t.state != tieredStorageServing {
return fmt.Errorf("Storage is not serving.")
} }
t.memoryArena.AppendSamples(samples) t.memoryArena.AppendSamples(samples)
@ -111,20 +124,28 @@ func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) {
} }
// Stops the storage subsystem, flushing all pending operations. // Stops the storage subsystem, flushing all pending operations.
func (t *TieredStorage) Drain() { func (t *TieredStorage) Drain(drained chan<- bool) {
log.Println("Starting drain...") t.mu.Lock()
drainingDone := make(chan bool) defer t.mu.Unlock()
if len(t.draining) == 0 {
t.draining <- drainingDone t.drain(drained)
}
func (t *TieredStorage) drain(drained chan<- bool) {
if t.state >= tieredStorageDraining {
panic("Illegal State: Supplemental drain requested.")
} }
<-drainingDone
log.Println("Done.") log.Println("Triggering drain...")
t.draining <- (drained)
} }
// Enqueues a ViewRequestBuilder for materialization, subject to a timeout. // Enqueues a ViewRequestBuilder for materialization, subject to a timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) { func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) {
if len(t.draining) > 0 { t.mu.RLock()
return nil, fmt.Errorf("Storage is in the process of draining.") defer t.mu.RUnlock()
if t.state != tieredStorageServing {
return nil, fmt.Errorf("Storage is not serving")
} }
// The result channel needs a one-element buffer in case we have timed out in // The result channel needs a one-element buffer in case we have timed out in
@ -157,6 +178,15 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
// Starts serving requests. // Starts serving requests.
func (t *TieredStorage) Serve(started chan<- bool) { func (t *TieredStorage) Serve(started chan<- bool) {
t.mu.Lock()
if t.state != tieredStorageStarting {
panic("Illegal State: Attempted to restart TieredStorage.")
return
}
t.state = tieredStorageServing
t.mu.Unlock()
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
defer flushMemoryTicker.Stop() defer flushMemoryTicker.Stop()
queueReportTicker := time.NewTicker(time.Second) queueReportTicker := time.NewTicker(time.Second)
@ -247,14 +277,25 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) {
} }
func (t *TieredStorage) Close() { func (t *TieredStorage) Close() {
log.Println("Closing tiered storage...") t.mu.Lock()
t.Drain() defer t.mu.Unlock()
t.DiskStorage.Close()
t.memoryArena.Close()
if t.state == tieredStorageStopping {
panic("Illegal State: Attempted to restop TieredStorage.")
}
drained := make(chan bool)
t.drain(drained)
<-drained
t.memoryArena.Close()
t.DiskStorage.Close()
// BUG(matt): There is a probability that pending items may hang here and not
// get flushed.
close(t.appendToDiskQueue) close(t.appendToDiskQueue)
close(t.viewQueue) close(t.viewQueue)
log.Println("Done.")
t.state = tieredStorageStopping
} }
func (t *TieredStorage) renderView(viewJob viewJob) { func (t *TieredStorage) renderView(viewJob viewJob) {
@ -478,17 +519,25 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
} }
// Get all label values that are associated with the provided label name. // Get all label values that are associated with the provided label name.
func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (model.LabelValues, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
panic("Illegal State: Attempted to query non-running TieredStorage.")
}
diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName) diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName)
if err != nil { if err != nil {
return return nil, err
} }
memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName) memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName)
if err != nil { if err != nil {
return return nil, err
} }
valueSet := map[model.LabelValue]bool{} valueSet := map[model.LabelValue]bool{}
values := model.LabelValues{}
for _, value := range append(diskValues, memoryValues...) { for _, value := range append(diskValues, memoryValues...) {
if !valueSet[value] { if !valueSet[value] {
values = append(values, value) values = append(values, value)
@ -496,40 +545,55 @@ func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values
} }
} }
return return values, nil
} }
// Get all of the metric fingerprints that are associated with the provided // Get all of the metric fingerprints that are associated with the provided
// label set. // label set.
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) { func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (model.Fingerprints, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
panic("Illegal State: Attempted to query non-running TieredStorage.")
}
memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet) memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
if err != nil { if err != nil {
return return nil, err
} }
diskFingerprints, err := t.DiskStorage.GetFingerprintsForLabelSet(labelSet) diskFingerprints, err := t.DiskStorage.GetFingerprintsForLabelSet(labelSet)
if err != nil { if err != nil {
return return nil, err
} }
fingerprintSet := map[model.Fingerprint]bool{} fingerprintSet := map[model.Fingerprint]bool{}
for _, fingerprint := range append(memFingerprints, diskFingerprints...) { for _, fingerprint := range append(memFingerprints, diskFingerprints...) {
fingerprintSet[*fingerprint] = true fingerprintSet[*fingerprint] = true
} }
fingerprints := model.Fingerprints{}
for fingerprint := range fingerprintSet { for fingerprint := range fingerprintSet {
fpCopy := fingerprint fpCopy := fingerprint
fingerprints = append(fingerprints, &fpCopy) fingerprints = append(fingerprints, &fpCopy)
} }
return return fingerprints, nil
} }
// Get the metric associated with the provided fingerprint. // Get the metric associated with the provided fingerprint.
func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) { func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) {
m, err = t.memoryArena.GetMetricForFingerprint(f) t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
panic("Illegal State: Attempted to query non-running TieredStorage.")
}
m, err := t.memoryArena.GetMetricForFingerprint(f)
if err != nil { if err != nil {
return return nil, err
} }
if m == nil { if m == nil {
m, err = t.DiskStorage.GetMetricForFingerprint(f) m, err = t.DiskStorage.GetMetricForFingerprint(f)
} }
return return m, err
} }