prometheus/storage/local/storage.go
Bjoern Rabenstein ecdf5ab14f Index-persistence switched from gob to a hand-coded solution.
Change-Id: Ib4ec42535bd08df16d34d4774bb638e35c5a1841
2014-11-25 17:02:00 +01:00

504 lines
12 KiB
Go

package storage_ng
import (
"sync"
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility"
)
const persistQueueCap = 1024
type storageState uint
const (
storageStarting storageState = iota
storageServing
storageStopping
)
type memorySeriesStorage struct {
mtx sync.RWMutex
state storageState
persistDone chan bool
stopServing chan chan<- bool
fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries
labelPairToFingerprints map[metric.LabelPair]utility.Set
labelNameToLabelValues map[clientmodel.LabelName]utility.Set
memoryEvictionInterval time.Duration
memoryRetentionPeriod time.Duration
persistencePurgeInterval time.Duration
persistenceRetentionPeriod time.Duration
persistQueue chan *persistRequest
persistence Persistence
}
type MemorySeriesStorageOptions struct {
Persistence Persistence
MemoryEvictionInterval time.Duration
MemoryRetentionPeriod time.Duration
PersistencePurgeInterval time.Duration
PersistenceRetentionPeriod time.Duration
}
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (*memorySeriesStorage, error) { // TODO: change to return Storage?
glog.Info("Loading indexes...")
i, err := o.Persistence.LoadIndexes()
if err != nil {
return nil, err
}
glog.Info("Loading series head chunks...")
if err := o.Persistence.LoadHeads(i.FingerprintToSeries); err != nil {
return nil, err
}
numSeries.Set(float64(len(i.FingerprintToSeries)))
return &memorySeriesStorage{
fingerprintToSeries: i.FingerprintToSeries,
labelPairToFingerprints: i.LabelPairToFingerprints,
labelNameToLabelValues: i.LabelNameToLabelValues,
persistDone: make(chan bool),
stopServing: make(chan chan<- bool),
memoryEvictionInterval: o.MemoryEvictionInterval,
memoryRetentionPeriod: o.MemoryRetentionPeriod,
persistencePurgeInterval: o.PersistencePurgeInterval,
persistenceRetentionPeriod: o.PersistenceRetentionPeriod,
persistQueue: make(chan *persistRequest, persistQueueCap),
persistence: o.Persistence,
}, nil
}
type persistRequest struct {
fingerprint clientmodel.Fingerprint
chunkDesc *chunkDesc
}
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
/*
s.mtx.Lock()
defer s.mtx.Unlock()
if s.state != storageServing {
panic("storage is not serving")
}
s.mtx.Unlock()
*/
for _, sample := range samples {
s.appendSample(sample)
}
numSamples.Add(float64(len(samples)))
}
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
s.mtx.Lock()
defer s.mtx.Unlock()
series := s.getOrCreateSeries(sample.Metric)
series.add(&metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
}, s.persistQueue)
}
func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric) *memorySeries {
fp := m.Fingerprint()
series, ok := s.fingerprintToSeries[fp]
if !ok {
series = newMemorySeries(m)
s.fingerprintToSeries[fp] = series
numSeries.Set(float64(len(s.fingerprintToSeries)))
for k, v := range m {
labelPair := metric.LabelPair{
Name: k,
Value: v,
}
fps, ok := s.labelPairToFingerprints[labelPair]
if !ok {
fps = utility.Set{}
s.labelPairToFingerprints[labelPair] = fps
}
fps.Add(fp)
values, ok := s.labelNameToLabelValues[k]
if !ok {
values = utility.Set{}
s.labelNameToLabelValues[k] = values
}
values.Add(v)
}
}
return series
}
/*
func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) {
series, ok := s.fingerprintToSeries[fp]
if !ok {
panic("requested preload for non-existent series")
}
return series.preloadChunksAtTime(ts, s.persistence)
}
*/
func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) {
s.mtx.RLock()
series, ok := s.fingerprintToSeries[fp]
s.mtx.RUnlock()
if !ok {
panic("requested preload for non-existent series")
}
return series.preloadChunksForRange(from, through, s.persistence)
}
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.mtx.RLock()
series, ok := s.fingerprintToSeries[fp]
s.mtx.RUnlock()
if !ok {
panic("requested iterator for non-existent series")
}
return series.newIterator()
}
func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, series := range s.fingerprintToSeries {
series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl))
}
}
func recordPersist(start time.Time, err error) {
outcome := success
if err != nil {
outcome = failure
}
persistLatencies.WithLabelValues(outcome).Observe(float64(time.Since(start) / time.Millisecond))
}
func (s *memorySeriesStorage) handlePersistQueue() {
for req := range s.persistQueue {
// TODO: Make this thread-safe?
persistQueueLength.Set(float64(len(s.persistQueue)))
//glog.Info("Persist request: ", *req.fingerprint)
start := time.Now()
err := s.persistence.PersistChunk(req.fingerprint, req.chunkDesc.chunk)
recordPersist(start, err)
if err != nil {
glog.Error("Error persisting chunk, requeuing: ", err)
s.persistQueue <- req
continue
}
req.chunkDesc.unpin()
}
s.persistDone <- true
}
// Close stops serving, flushes all pending operations, and frees all resources.
func (s *memorySeriesStorage) Close() error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.state == storageStopping {
panic("Illegal State: Attempted to restop memorySeriesStorage.")
}
stopped := make(chan bool)
glog.Info("Waiting for storage to stop serving...")
s.stopServing <- (stopped)
glog.Info("Serving stopped.")
<-stopped
glog.Info("Stopping persist loop...")
close(s.persistQueue)
<-s.persistDone
glog.Info("Persist loop stopped.")
glog.Info("Persisting head chunks...")
if err := s.persistHeads(); err != nil {
return err
}
glog.Info("Done persisting head chunks.")
glog.Info("Persisting indexes...")
if err := s.persistIndexes(); err != nil {
return err
}
glog.Info("Done persisting indexes.")
for _, series := range s.fingerprintToSeries {
series.close()
}
s.fingerprintToSeries = nil
s.state = storageStopping
return nil
}
func (s *memorySeriesStorage) persistHeads() error {
return s.persistence.PersistHeads(s.fingerprintToSeries)
}
func (s *memorySeriesStorage) persistIndexes() error {
err := s.persistence.PersistIndexes(&Indexes{
FingerprintToSeries: s.fingerprintToSeries,
LabelPairToFingerprints: s.labelPairToFingerprints,
LabelNameToLabelValues: s.labelNameToLabelValues,
})
return err
}
func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
purgeTicker := time.NewTicker(s.persistencePurgeInterval)
defer purgeTicker.Stop()
for {
select {
case <-stop:
return
case <-purgeTicker.C:
glog.Info("Purging old series data...")
s.mtx.RLock()
fps := make([]clientmodel.Fingerprint, 0, len(s.fingerprintToSeries))
for fp := range s.fingerprintToSeries {
fps = append(fps, fp)
}
s.mtx.RUnlock()
for _, fp := range fps {
select {
case <-stop:
glog.Info("Interrupted running series purge.")
return
default:
s.purgeSeries(&fp)
}
}
glog.Info("Done purging old series data.")
}
}
}
func (s *memorySeriesStorage) purgeSeries(fp *clientmodel.Fingerprint) {
s.mtx.RLock()
series, ok := s.fingerprintToSeries[*fp]
if !ok {
return
}
s.mtx.RUnlock()
drop, err := series.purgeOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1*s.persistenceRetentionPeriod), s.persistence)
if err != nil {
glog.Error("Error purging series data: ", err)
}
if drop {
s.dropSeries(fp)
}
}
// Drop a label value from the label names to label values index.
func (s *memorySeriesStorage) dropLabelValue(l clientmodel.LabelName, v clientmodel.LabelValue) {
if set, ok := s.labelNameToLabelValues[l]; ok {
set.Remove(v)
if len(set) == 0 {
delete(s.labelNameToLabelValues, l)
}
}
}
// Drop all references to a series, including any samples.
func (s *memorySeriesStorage) dropSeries(fp *clientmodel.Fingerprint) {
s.mtx.Lock()
defer s.mtx.Unlock()
series, ok := s.fingerprintToSeries[*fp]
if !ok {
return
}
for k, v := range series.metric {
labelPair := metric.LabelPair{
Name: k,
Value: v,
}
if set, ok := s.labelPairToFingerprints[labelPair]; ok {
set.Remove(*fp)
if len(set) == 0 {
delete(s.labelPairToFingerprints, labelPair)
s.dropLabelValue(k, v)
}
}
}
delete(s.fingerprintToSeries, *fp)
}
func (s *memorySeriesStorage) Serve(started chan<- bool) {
s.mtx.Lock()
if s.state != storageStarting {
panic("Illegal State: Attempted to restart memorySeriesStorage.")
}
s.state = storageServing
s.mtx.Unlock()
evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval)
defer evictMemoryTicker.Stop()
go s.handlePersistQueue()
stopPurge := make(chan bool)
go s.purgePeriodically(stopPurge)
started <- true
for {
select {
case <-evictMemoryTicker.C:
s.evictMemoryChunks(s.memoryRetentionPeriod)
case stopped := <-s.stopServing:
stopPurge <- true
stopped <- true
return
}
}
}
func (s *memorySeriesStorage) NewPreloader() Preloader {
return &memorySeriesPreloader{
storage: s,
}
}
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
s.mtx.RLock()
defer s.mtx.RUnlock()
sets := []utility.Set{}
for _, matcher := range labelMatchers {
switch matcher.Type {
case metric.Equal:
set, ok := s.labelPairToFingerprints[metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
}]
if !ok {
return nil
}
sets = append(sets, set)
default:
values := s.getLabelValuesForLabelName(matcher.Name)
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
set := utility.Set{}
for _, v := range matches {
subset, ok := s.labelPairToFingerprints[metric.LabelPair{
Name: matcher.Name,
Value: v,
}]
if !ok {
return nil
}
for fp := range subset {
set.Add(fp)
}
}
sets = append(sets, set)
}
}
setCount := len(sets)
if setCount == 0 {
return nil
}
base := sets[0]
for i := 1; i < setCount; i++ {
base = base.Intersection(sets[i])
}
fingerprints := clientmodel.Fingerprints{}
for _, e := range base.Elements() {
fingerprints = append(fingerprints, e.(clientmodel.Fingerprint))
}
return fingerprints
}
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.getLabelValuesForLabelName(labelName)
}
func (s *memorySeriesStorage) getLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
set, ok := s.labelNameToLabelValues[labelName]
if !ok {
return nil
}
values := make(clientmodel.LabelValues, 0, len(set))
for e := range set {
val := e.(clientmodel.LabelValue)
values = append(values, val)
}
return values
}
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric {
s.mtx.RLock()
defer s.mtx.RUnlock()
series, ok := s.fingerprintToSeries[fp]
if !ok {
return nil
}
metric := clientmodel.Metric{}
for label, value := range series.metric {
metric[label] = value
}
return metric
}
func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) clientmodel.LabelValues {
s.mtx.RLock()
defer s.mtx.RUnlock()
var values clientmodel.LabelValues
valueSet := map[clientmodel.LabelValue]struct{}{}
for _, series := range s.fingerprintToSeries {
if value, ok := series.metric[labelName]; ok {
if _, ok := valueSet[value]; !ok {
values = append(values, value)
valueSet[value] = struct{}{}
}
}
}
return values
}