prometheus/storage/local/storage.go
Bjoern Rabenstein 5a128a04a9 Major reorganization of the storage.
Most important, the heads file will now persist all the chunk descs,
too. Implicitly, it will serve as the persisted form of the
fp-to-series map.

Change-Id: Ic867e78f2714d54c3b5733939cc5aef43f7bd08d
2014-11-25 17:02:01 +01:00

463 lines
12 KiB
Go

package storage_ng
import (
"sync"
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
const persistQueueCap = 1024
type storageState uint
const (
storageStarting storageState = iota
storageServing
storageStopping
)
type memorySeriesStorage struct {
mtx sync.RWMutex
state storageState
persistDone chan bool
stopServing chan chan<- bool
fingerprintToSeries SeriesMap
memoryEvictionInterval time.Duration
memoryRetentionPeriod time.Duration
persistencePurgeInterval time.Duration
persistenceRetentionPeriod time.Duration
persistQueue chan *persistRequest
persistence Persistence
}
type MemorySeriesStorageOptions struct {
Persistence Persistence
MemoryEvictionInterval time.Duration
MemoryRetentionPeriod time.Duration
PersistencePurgeInterval time.Duration
PersistenceRetentionPeriod time.Duration
}
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
glog.Info("Loading series map and head chunks...")
fingerprintToSeries, err := o.Persistence.LoadSeriesMapAndHeads()
if err != nil {
return nil, err
}
numSeries.Set(float64(len(fingerprintToSeries)))
return &memorySeriesStorage{
fingerprintToSeries: fingerprintToSeries,
persistDone: make(chan bool),
stopServing: make(chan chan<- bool),
memoryEvictionInterval: o.MemoryEvictionInterval,
memoryRetentionPeriod: o.MemoryRetentionPeriod,
persistencePurgeInterval: o.PersistencePurgeInterval,
persistenceRetentionPeriod: o.PersistenceRetentionPeriod,
persistQueue: make(chan *persistRequest, persistQueueCap),
persistence: o.Persistence,
}, nil
}
type persistRequest struct {
fingerprint clientmodel.Fingerprint
chunkDesc *chunkDesc
}
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
/*
s.mtx.Lock()
defer s.mtx.Unlock()
if s.state != storageServing {
panic("storage is not serving")
}
s.mtx.Unlock()
*/
for _, sample := range samples {
s.appendSample(sample)
}
numSamples.Add(float64(len(samples)))
}
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
s.mtx.Lock()
defer s.mtx.Unlock()
series := s.getOrCreateSeries(sample.Metric)
series.add(&metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
}, s.persistQueue)
}
func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric) *memorySeries {
fp := m.Fingerprint()
series, ok := s.fingerprintToSeries[fp]
if !ok {
series = newMemorySeries(m)
s.fingerprintToSeries[fp] = series
numSeries.Set(float64(len(s.fingerprintToSeries)))
unarchived, err := s.persistence.UnarchiveMetric(fp)
if err != nil {
glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
}
if unarchived {
// The series existed before, had been archived at some
// point, and has now been unarchived, i.e. it has
// chunks on disk. Set chunkDescsLoaded accordingly so
// that they will be looked at later.
series.chunkDescsLoaded = false
} else {
// This was a genuinely new series, so index the metric.
if err := s.persistence.IndexMetric(m); err != nil {
glog.Errorf("Error indexing metric %v: %v", m, err)
}
}
}
return series
}
/*
func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) {
series, ok := s.fingerprintToSeries[fp]
if !ok {
panic("requested preload for non-existent series")
}
return series.preloadChunksAtTime(ts, s.persistence)
}
*/
func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) {
s.mtx.RLock()
series, ok := s.fingerprintToSeries[fp]
s.mtx.RUnlock()
if !ok {
panic("requested preload for non-existent series")
}
return series.preloadChunksForRange(from, through, s.persistence)
}
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.mtx.RLock()
series, ok := s.fingerprintToSeries[fp]
s.mtx.RUnlock()
if !ok {
panic("requested iterator for non-existent series")
}
return series.newIterator()
}
func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, series := range s.fingerprintToSeries {
series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl))
}
}
func recordPersist(start time.Time, err error) {
outcome := success
if err != nil {
outcome = failure
}
persistLatencies.WithLabelValues(outcome).Observe(float64(time.Since(start) / time.Millisecond))
}
func (s *memorySeriesStorage) handlePersistQueue() {
// TODO: Perhaps move this into Persistence?
for req := range s.persistQueue {
// TODO: Make this thread-safe?
persistQueueLength.Set(float64(len(s.persistQueue)))
//glog.Info("Persist request: ", *req.fingerprint)
start := time.Now()
err := s.persistence.PersistChunk(req.fingerprint, req.chunkDesc.chunk)
recordPersist(start, err)
if err != nil {
glog.Error("Error persisting chunk, requeuing: ", err)
s.persistQueue <- req
continue
}
req.chunkDesc.unpin()
}
s.persistDone <- true
}
// Close stops serving, flushes all pending operations, and frees all resources.
func (s *memorySeriesStorage) Close() error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.state == storageStopping {
panic("Illegal State: Attempted to restop memorySeriesStorage.")
}
stopped := make(chan bool)
glog.Info("Waiting for storage to stop serving...")
s.stopServing <- (stopped)
glog.Info("Serving stopped.")
<-stopped
glog.Info("Stopping persist loop...")
close(s.persistQueue)
<-s.persistDone
glog.Info("Persist loop stopped.")
glog.Info("Persisting head chunks...")
if err := s.persistence.PersistSeriesMapAndHeads(s.fingerprintToSeries); err != nil {
return err
}
glog.Info("Done persisting head chunks.")
s.fingerprintToSeries = nil
if err := s.persistence.Close(); err != nil {
return err
}
s.state = storageStopping
return nil
}
func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
purgeTicker := time.NewTicker(s.persistencePurgeInterval)
defer purgeTicker.Stop()
for {
select {
case <-stop:
return
case <-purgeTicker.C:
glog.Info("Purging old series data...")
s.mtx.RLock()
fps := make([]clientmodel.Fingerprint, 0, len(s.fingerprintToSeries))
for fp := range s.fingerprintToSeries {
fps = append(fps, fp)
}
s.mtx.RUnlock()
for _, fp := range fps {
select {
case <-stop:
glog.Info("Interrupted running series purge.")
return
default:
s.purgeSeries(fp)
}
}
glog.Info("Done purging old series data.")
}
}
}
// purgeSeries purges chunks older than persistenceRetentionPeriod from a
// series. If the series contains no chunks after the purge, it is dropped
// entirely.
func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint) {
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod)
s.mtx.Lock()
// TODO: This is a lock FAR to coarse! However, we cannot lock using the
// memorySeries since we might have none (for series that are on disk
// only). And we really don't want to un-archive a series from disk
// while we are at the same time purging it. A locking per fingerprint
// would be nice. Or something... Have to think about it... Careful,
// more race conditions lurk below. Also unsolved: If there are chunks
// in the persist queue. persistence.DropChunks and
// persistence.PersistChunck needs to be locked on fp level, or
// something. And even then, what happens if everything is dropped, but
// there are still chunks hung in the persist queue? They would later
// re-create a file for a series that doesn't exist anymore...
defer s.mtx.Unlock()
// First purge persisted chunks. We need to do that anyway.
allDropped, err := s.persistence.DropChunks(fp, ts)
if err != nil {
glog.Error("Error purging persisted chunks: ", err)
}
// Purge chunks from memory accordingly.
if series, ok := s.fingerprintToSeries[fp]; ok {
if series.purgeOlderThan(ts) {
delete(s.fingerprintToSeries, fp)
if err := s.persistence.UnindexMetric(series.metric); err != nil {
glog.Errorf("Error unindexing metric %v: %v", series.metric, err)
}
}
return
}
// If nothing was in memory, the metric must have been archived. Drop
// the archived metric if there are no persisted chunks left.
if !allDropped {
return
}
if err := s.persistence.DropArchivedMetric(fp); err != nil {
glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
}
}
func (s *memorySeriesStorage) Serve(started chan<- bool) {
s.mtx.Lock()
if s.state != storageStarting {
panic("Illegal State: Attempted to restart memorySeriesStorage.")
}
s.state = storageServing
s.mtx.Unlock()
evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval)
defer evictMemoryTicker.Stop()
go s.handlePersistQueue()
stopPurge := make(chan bool)
go s.purgePeriodically(stopPurge)
started <- true
for {
select {
case <-evictMemoryTicker.C:
s.evictMemoryChunks(s.memoryRetentionPeriod)
case stopped := <-s.stopServing:
stopPurge <- true
stopped <- true
return
}
}
}
func (s *memorySeriesStorage) NewPreloader() Preloader {
return &memorySeriesPreloader{
storage: s,
}
}
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
s.mtx.RLock()
defer s.mtx.RUnlock()
var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers {
intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type {
case metric.Equal:
fps, err := s.persistence.GetFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 {
return nil
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
default:
values, err := s.persistence.GetLabelValuesForLabelName(matcher.Name)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps, err := s.persistence.GetFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: v,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
}
}
if len(intersection) == 0 {
return nil
}
result = intersection
}
fps := make(clientmodel.Fingerprints, 0, len(result))
for fp := range result {
fps = append(fps, fp)
}
return fps
}
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
s.mtx.RLock()
defer s.mtx.RUnlock()
lvs, err := s.persistence.GetLabelValuesForLabelName(labelName)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", labelName, err)
}
return lvs
}
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric {
s.mtx.RLock()
defer s.mtx.RUnlock()
series, ok := s.fingerprintToSeries[fp]
if ok {
// TODO: Does this have to be a copy? Ask Julius!
return series.metric
}
metric, err := s.persistence.GetArchivedMetric(fp)
if err != nil {
glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
}
return metric
}
func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) clientmodel.LabelValues {
s.mtx.RLock()
defer s.mtx.RUnlock()
var values clientmodel.LabelValues
valueSet := map[clientmodel.LabelValue]struct{}{}
for _, series := range s.fingerprintToSeries {
if value, ok := series.metric[labelName]; ok {
if _, ok := valueSet[value]; !ok {
values = append(values, value)
valueSet[value] = struct{}{}
}
}
}
return values
}