prometheus/storage/local/storage.go

1084 lines
33 KiB
Go
Raw Normal View History

// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package local contains the local time series storage used by Prometheus.
package local
import (
"container/list"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
const (
evictRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP.
fpMaxWaitDuration = 10 * time.Second
fpMinWaitDuration = 20 * time.Millisecond // A small multiple of disk seek time.
fpMaxSweepTime = 6 * time.Hour
maxEvictInterval = time.Minute
headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
appendWorkers = 8 // Should be enough to not make appending a bottleneck.
appendQueueCap = 2 * appendWorkers
)
type storageState uint
const (
storageStarting storageState = iota
storageServing
storageStopping
)
type persistRequest struct {
fingerprint clientmodel.Fingerprint
chunkDesc *chunkDesc
}
type evictRequest struct {
cd *chunkDesc
evict bool
}
type memorySeriesStorage struct {
fpLocker *fingerprintLocker
fpToSeries *seriesMap
loopStopping, loopStopped chan struct{}
maxMemoryChunks int
dropAfter time.Duration
checkpointInterval time.Duration
checkpointDirtySeriesLimit int
appendQueue chan *clientmodel.Sample
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
persistQueue chan persistRequest
persistQueueCap int // Not actually the cap of above channel. See handlePersistQueue.
persistStopped chan struct{}
persistence *persistence
countPersistedHeadChunks chan struct{}
evictList *list.List
evictRequests chan evictRequest
evictStopping, evictStopped chan struct{}
persistLatency prometheus.Summary
persistErrors prometheus.Counter
persistQueueCapacity prometheus.Metric
persistQueueLength prometheus.Gauge
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter
}
// MemorySeriesStorageOptions contains options needed by
// NewMemorySeriesStorage. It is not safe to leave any of those at their zero
// values.
type MemorySeriesStorageOptions struct {
MemoryChunks int // How many chunks to keep in memory.
PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup.
}
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
p, err := newPersistence(o.PersistenceStoragePath, chunkLen, o.Dirty)
if err != nil {
return nil, err
}
glog.Info("Loading series map and head chunks...")
fpToSeries, err := p.loadSeriesMapAndHeads()
if err != nil {
return nil, err
}
glog.Infof("%d series loaded.", fpToSeries.length())
numSeries := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
})
numSeries.Set(float64(fpToSeries.length()))
s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(1024),
fpToSeries: fpToSeries,
loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}),
maxMemoryChunks: o.MemoryChunks,
dropAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
// The actual buffering happens within handlePersistQueue, so
// cap of persistQueue just has to be enough to not block while
// handlePersistQueue is writing to disk (20ms or so).
persistQueue: make(chan persistRequest, 1024),
persistQueueCap: o.PersistenceQueueCapacity,
persistStopped: make(chan struct{}),
persistence: p,
countPersistedHeadChunks: make(chan struct{}, 100),
evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap),
evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}),
persistLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_latency_microseconds",
Help: "A summary of latencies for persisting each chunk.",
}),
persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.",
}),
persistQueueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
"The total capacity of the persist queue.",
nil, nil,
),
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
),
persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_queue_length",
Help: "The current number of chunks waiting in the persist queue.",
}),
numSeries: numSeries,
seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_ops_total",
Help: "The total number of series operations by their type.",
},
[]string{opTypeLabel},
),
ingestedSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ingested_samples_total",
Help: "The total number of samples ingested.",
}),
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "invalid_preload_requests_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
}),
}
for i := 0; i < appendWorkers; i++ {
go func() {
for sample := range s.appendQueue {
s.appendSample(sample)
s.appendWaitGroup.Done()
}
}()
}
return s, nil
}
// Start implements Storage.
func (s *memorySeriesStorage) Start() {
go s.handleEvictList()
go s.handlePersistQueue()
go s.loop()
}
// Stop implements Storage.
func (s *memorySeriesStorage) Stop() error {
glog.Info("Stopping local storage...")
glog.Info("Draining append queue...")
close(s.appendQueue)
s.appendWaitGroup.Wait()
glog.Info("Append queue drained.")
glog.Info("Stopping maintenance loop...")
close(s.loopStopping)
<-s.loopStopped
glog.Info("Stopping persist queue...")
close(s.persistQueue)
<-s.persistStopped
glog.Info("Stopping chunk eviction...")
close(s.evictStopping)
<-s.evictStopped
// One final checkpoint of the series map and the head chunks.
if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil {
return err
}
if err := s.persistence.close(); err != nil {
return err
}
glog.Info("Local storage stopped.")
return nil
}
// WaitForIndexing implements Storage.
func (s *memorySeriesStorage) WaitForIndexing() {
// First let all goroutines appending samples stop.
s.appendWaitGroup.Wait()
// Only then wait for the persistence to index them.
s.persistence.waitForIndexing()
}
// NewIterator implements storage.
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
// Oops, no series for fp found. That happens if, after
// preloading is done, the whole series is identified as old
// enough for purging and hence purged for good. As there is no
// data left to iterate over, return an iterator that will never
// return any values.
return nopSeriesIterator{}
}
return series.newIterator(
func() { s.fpLocker.Lock(fp) },
func() { s.fpLocker.Unlock(fp) },
)
}
// NewPreloader implements Storage.
func (s *memorySeriesStorage) NewPreloader() Preloader {
return &memorySeriesPreloader{
storage: s,
}
}
// GetFingerprintsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers {
intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type {
case metric.Equal:
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 {
return nil
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
default:
values, err := s.persistence.getLabelValuesForLabelName(matcher.Name)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: v,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
}
}
if len(intersection) == 0 {
return nil
}
result = intersection
}
fps := make(clientmodel.Fingerprints, 0, len(result))
for fp := range result {
fps = append(fps, fp)
}
return fps
}
// GetLabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
lvs, err := s.persistence.getLabelValuesForLabelName(labelName)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", labelName, err)
}
return lvs
}
// GetMetricForFingerprint implements Storage.
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if ok {
// Wrap the returned metric in a copy-on-write (COW) metric here because
// the caller might mutate it.
return clientmodel.COWMetric{
Metric: series.metric,
}
}
metric, err := s.persistence.getArchivedMetric(fp)
if err != nil {
glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
}
return clientmodel.COWMetric{
Metric: metric,
}
}
// AppendSamples implements Storage.
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
for _, sample := range samples {
if sample.Timestamp != s.appendLastTimestamp {
// Timestamp has changed. We have to wait for processing
// of all appended samples before proceeding. Otherwise,
// we might violate the storage contract that each
// sample appended to a given series has to have a
// timestamp greater or equal to the previous sample
// appended to that series.
s.appendWaitGroup.Wait()
s.appendLastTimestamp = sample.Timestamp
}
s.appendWaitGroup.Add(1)
s.appendQueue <- sample
}
}
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
fp := sample.Metric.Fingerprint()
s.fpLocker.Lock(fp)
series := s.getOrCreateSeries(fp, sample.Metric)
chunkDescsToPersist := series.add(fp, &metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
s.fpLocker.Unlock(fp)
s.ingestedSamplesCount.Inc()
if len(chunkDescsToPersist) == 0 {
return
}
// Queue only outside of the locked area, processing the persistQueue
// requires the same lock!
for _, cd := range chunkDescsToPersist {
s.persistQueue <- persistRequest{fp, cd}
}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
}
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
series, ok := s.fpToSeries.get(fp)
if !ok {
unarchived, firstTime, err := s.persistence.unarchiveMetric(fp)
if err != nil {
glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
}
if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc()
} else {
// This was a genuinely new series, so index the metric.
s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc()
}
series = newMemorySeries(m, !unarchived, firstTime)
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
}
return series
}
func (s *memorySeriesStorage) preloadChunksForRange(
fp clientmodel.Fingerprint,
from clientmodel.Timestamp, through clientmodel.Timestamp,
stalenessDelta time.Duration,
) ([]*chunkDesc, error) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
return nil, err
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil, nil
}
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
metric, err := s.persistence.getArchivedMetric(fp)
if err != nil {
return nil, err
}
series = s.getOrCreateSeries(fp, metric)
} else {
return nil, nil
}
}
return series.preloadChunksForRange(from, through, fp, s)
}
func (s *memorySeriesStorage) handleEvictList() {
ticker := time.NewTicker(maxEvictInterval)
count := 0
2015-01-07 10:02:38 -08:00
for {
// To batch up evictions a bit, this tries evictions at least
// once per evict interval, but earlier if the number of evict
// requests with evict==true that have happened since the last
// evict run is more than maxMemoryChunks/1000.
select {
case req := <-s.evictRequests:
if req.evict {
req.cd.evictListElement = s.evictList.PushBack(req.cd)
count++
if count > s.maxMemoryChunks/1000 {
s.maybeEvict()
count = 0
}
} else {
if req.cd.evictListElement != nil {
s.evictList.Remove(req.cd.evictListElement)
req.cd.evictListElement = nil
}
}
case <-ticker.C:
if s.evictList.Len() > 0 {
s.maybeEvict()
}
case <-s.evictStopping:
// Drain evictRequests forever in a goroutine to not let
// requesters hang.
go func() {
for {
<-s.evictRequests
2015-01-07 10:02:38 -08:00
}
}()
ticker.Stop()
glog.Info("Chunk eviction stopped.")
close(s.evictStopped)
return
}
}
}
// maybeEvict is a local helper method. Must only be called by handleEvictList.
func (s *memorySeriesStorage) maybeEvict() {
numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks
if numChunksToEvict <= 0 {
return
}
chunkDescsToEvict := make([]*chunkDesc, numChunksToEvict)
for i := range chunkDescsToEvict {
e := s.evictList.Front()
if e == nil {
break
}
cd := e.Value.(*chunkDesc)
cd.evictListElement = nil
chunkDescsToEvict[i] = cd
s.evictList.Remove(e)
}
// Do the actual eviction in a goroutine as we might otherwise deadlock,
// in the following way: A chunk was unpinned completely and therefore
// scheduled for eviction. At the time we actually try to evict it,
// another goroutine is pinning the chunk. The pinning goroutine has
// currently locked the chunk and tries to send the evict request (to
// remove the chunk from the evict list) to the evictRequests
// channel. The send blocks because evictRequests is full. However, the
// goroutine that is supposed to empty the channel is waiting for the
// chunkDesc lock to try to evict the chunk.
go func() {
for _, cd := range chunkDescsToEvict {
if cd == nil {
break
}
cd.maybeEvict()
// We don't care if the eviction succeeds. If the chunk
// was pinned in the meantime, it will be added to the
// evict list once it gets unpinned again.
}
}()
}
func (s *memorySeriesStorage) handlePersistQueue() {
chunkMaps := chunkMaps{}
chunkCount := 0
persistMostConsecutiveChunks := func() {
fp, cds := chunkMaps.pop()
if err := s.persistChunks(fp, cds); err != nil {
// Need to put chunks back for retry.
for _, cd := range cds {
chunkMaps.add(fp, cd)
}
return
}
chunkCount -= len(cds)
}
loop:
for {
if chunkCount >= s.persistQueueCap && chunkCount > 0 {
glog.Warningf("%d chunks queued for persistence. Ingestion pipeline will backlog.", chunkCount)
persistMostConsecutiveChunks()
}
select {
case req, ok := <-s.persistQueue:
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
default:
if chunkCount > 0 {
persistMostConsecutiveChunks()
continue loop
}
// If we are here, there is nothing to do right now. So
// just wait for a persist request to come in.
req, ok := <-s.persistQueue
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
}
s.persistQueueLength.Set(float64(chunkCount))
}
// Drain all requests.
for _, m := range chunkMaps {
for fp, cds := range m {
if s.persistChunks(fp, cds) == nil {
chunkCount -= len(cds)
if (chunkCount+len(cds))/1000 > chunkCount/1000 {
glog.Infof(
"Still draining persist queue, %d chunks left to persist...",
chunkCount,
)
}
s.persistQueueLength.Set(float64(chunkCount))
}
}
}
glog.Info("Persist queue drained and stopped.")
close(s.persistStopped)
}
func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*chunkDesc) error {
start := time.Now()
chunks := make([]chunk, len(cds))
for i, cd := range cds {
chunks[i] = cd.chunk
}
s.fpLocker.Lock(fp)
offset, err := s.persistence.persistChunks(fp, chunks)
if series, seriesInMemory := s.fpToSeries.get(fp); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
s.fpLocker.Unlock(fp)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.Inc()
glog.Error("Error persisting chunks: ", err)
s.persistence.setDirty(true)
return err
}
for _, cd := range cds {
cd.unpin(s.evictRequests)
}
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
return nil
}
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
// to drop chunks after 40h, we want to cycle through all fingerprints within
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore,
// this method will always wait for at least fpMinWaitDuration and never longer
// than fpMaxWaitDuration. If s.loopStopped is closed, it will return false
// immediately. The estimation is based on the total number of fingerprints as
// passed in.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
d := fpMaxWaitDuration
if numberOfFPs != 0 {
sweepTime := s.dropAfter / 10
if sweepTime > fpMaxSweepTime {
sweepTime = fpMaxSweepTime
}
d = sweepTime / time.Duration(numberOfFPs)
if d < fpMinWaitDuration {
d = fpMinWaitDuration
}
if d > fpMaxWaitDuration {
d = fpMaxWaitDuration
}
}
t := time.NewTimer(d)
select {
case <-t.C:
return true
case <-s.loopStopping:
return false
}
}
// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for
// series in memory in a throttled fashion. It continues to cycle through all
// fingerprints in memory until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.Fingerprint {
memoryFingerprints := make(chan clientmodel.Fingerprint)
go func() {
var fpIter <-chan clientmodel.Fingerprint
defer func() {
if fpIter != nil {
for range fpIter {
// Consume the iterator.
}
}
close(memoryFingerprints)
}()
for {
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(s.fpToSeries.length()) {
return
}
begin := time.Now()
fpIter = s.fpToSeries.fpIter()
count := 0
for fp := range fpIter {
select {
case memoryFingerprints <- fp:
case <-s.loopStopping:
return
}
s.waitForNextFP(s.fpToSeries.length())
count++
}
if count > 0 {
glog.Infof(
"Completed maintenance sweep through %d in-memory fingerprints in %v.",
count, time.Since(begin),
)
}
}
}()
return memoryFingerprints
}
// cycleThroughArchivedFingerprints returns a channel that emits fingerprints
// for archived series in a throttled fashion. It continues to cycle through all
// archived fingerprints until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmodel.Fingerprint {
archivedFingerprints := make(chan clientmodel.Fingerprint)
go func() {
defer close(archivedFingerprints)
for {
archivedFPs, err := s.persistence.getFingerprintsModifiedBefore(
clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter),
)
if err != nil {
glog.Error("Failed to lookup archived fingerprint ranges: ", err)
s.waitForNextFP(0)
continue
}
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(len(archivedFPs)) {
return
}
begin := time.Now()
for _, fp := range archivedFPs {
select {
case archivedFingerprints <- fp:
case <-s.loopStopping:
return
}
s.waitForNextFP(len(archivedFPs))
}
if len(archivedFPs) > 0 {
glog.Infof(
"Completed maintenance sweep through %d archived fingerprints in %v.",
len(archivedFPs), time.Since(begin),
)
}
}
}()
return archivedFingerprints
}
func (s *memorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval)
// We take the number of head chunks persisted since the last checkpoint
// as an approximation for the number of series that are "dirty",
// i.e. whose head chunk is different from the one in the most recent
// checkpoint or for which the fact that the head chunk has been
// persisted is not reflected in the most recent checkpoint. This count
// could overestimate the number of dirty series, but it's good enough
2015-01-09 02:04:20 -08:00
// as a heuristic.
headChunksPersistedSinceLastCheckpoint := 0
defer func() {
checkpointTimer.Stop()
glog.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()
memoryFingerprints := s.cycleThroughMemoryFingerprints()
archivedFingerprints := s.cycleThroughArchivedFingerprints()
loop:
for {
select {
case <-s.loopStopping:
break loop
case <-checkpointTimer.C:
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
headChunksPersistedSinceLastCheckpoint = 0
checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints:
s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
case fp := <-archivedFingerprints:
s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
case <-s.countPersistedHeadChunks:
headChunksPersistedSinceLastCheckpoint++
// Check if we have enough "dirty" series so that we need an early checkpoint.
// As described above, we take the headChunksPersistedSinceLastCheckpoint as a
// heuristic for "dirty" series. However, if we are already backlogging
// chunks to be persisted, creating a checkpoint would be counterproductive,
// as it would slow down chunk persisting even more, while in a situation like
// that, the best we can do for crash recovery is to work through the persist
// queue as quickly as possible. So only checkpoint if s.persistQueue is
// at most 20% full.
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit &&
len(s.persistQueue) < cap(s.persistQueue)/5 {
checkpointTimer.Reset(0)
}
}
}
// Wait until both channels are closed.
for range memoryFingerprints {
}
for range archivedFingerprints {
}
}
// maintainMemorySeries first purges the series from old chunks. If the series
// still exists after that, it proceeds with the following steps: It closes the
// head chunk if it was not touched in a while. It archives a series if all
// chunks are evicted. It evicts chunkDescs if there are too many.
func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
var headChunkToPersist *chunkDesc
s.fpLocker.Lock(fp)
defer func() {
s.fpLocker.Unlock(fp)
// Queue outside of lock!
if headChunkToPersist != nil {
s.persistQueue <- persistRequest{fp, headChunkToPersist}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
}
}()
series, ok := s.fpToSeries.get(fp)
if !ok {
// Series is actually not in memory, perhaps archived or dropped in the meantime.
return
}
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
if s.purgeMemorySeries(fp, series, beforeTime) {
// Series is gone now, we are done.
return
}
iOldestNotEvicted := -1
for i, cd := range series.chunkDescs {
if !cd.isEvicted() {
iOldestNotEvicted = i
break
}
}
// Archive if all chunks are evicted.
if iOldestNotEvicted == -1 {
s.fpToSeries.del(fp)
s.numSeries.Dec()
// Make sure we have a head chunk descriptor (a freshly
// unarchived series has none).
if len(series.chunkDescs) == 0 {
cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
if err != nil {
glog.Errorf(
"Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v",
series.metric, err,
)
return
}
series.chunkDescs = cds
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", series.metric, err)
return
}
s.seriesOps.WithLabelValues(archive).Inc()
return
}
// If we are here, the series is not archived, so check for chunkDesc
// eviction next and then if the head chunk needs to be persisted.
series.evictChunkDescs(iOldestNotEvicted)
if !series.headChunkPersisted && time.Now().Sub(series.head().lastTime().Time()) > headChunkTimeout {
series.headChunkPersisted = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
series.headChunkUsedByIterator = false
headChunkToPersist = series.head()
}
}
// purgeMemorySeries drops chunks older than beforeTime from the provided memory
// series. The caller must have locked fp. If the series contains no chunks
// after dropping old chunks, it is purged entirely. In that case, the method
// returns true.
func (s *memorySeriesStorage) purgeMemorySeries(fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp) bool {
if !series.firstTime().Before(beforeTime) {
// Oldest sample not old enough.
return false
}
newFirstTime, numDroppedFromPersistence, allDroppedFromPersistence, err := s.persistence.dropChunks(fp, beforeTime)
if err != nil {
glog.Error("Error dropping persisted chunks: ", err)
}
numDroppedFromMemory, allDroppedFromMemory := series.dropChunks(beforeTime)
if allDroppedFromPersistence && allDroppedFromMemory {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.seriesOps.WithLabelValues(memoryPurge).Inc()
s.persistence.unindexMetric(fp, series.metric)
return true
}
if series.chunkDescsOffset != -1 {
series.savedFirstTime = newFirstTime
series.chunkDescsOffset += numDroppedFromMemory - numDroppedFromPersistence
if series.chunkDescsOffset < 0 {
panic("dropped more chunks from persistence than from memory")
}
}
return false
}
// maintainArchivedSeries drops chunks older than beforeTime from an archived
// series. If the series contains no chunks after that, it is purged entirely.
func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
glog.Error("Error looking up archived time range: ", err)
return
}
if !has || !firstTime.Before(beforeTime) {
// Oldest sample not old enough, or metric purged or unarchived in the meantime.
return
}
defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
if err != nil {
glog.Error("Error dropping persisted chunks: ", err)
}
if allDropped {
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
glog.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err)
return
}
s.seriesOps.WithLabelValues(archivePurge).Inc()
return
}
s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime)
}
// See persistence.loadChunks for detailed explanation.
func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
return s.persistence.loadChunks(fp, indexes, indexOffset)
}
// See persistence.loadChunkDescs for detailed explanation.
func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) {
return s.persistence.loadChunkDescs(fp, beforeTime)
}
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)
ch <- s.persistLatency.Desc()
ch <- s.persistErrors.Desc()
ch <- s.persistQueueCapacity.Desc()
ch <- s.persistQueueLength.Desc()
ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc()
ch <- numMemChunksDesc
}
// Collect implements prometheus.Collector.
func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.persistence.Collect(ch)
ch <- s.persistLatency
ch <- s.persistErrors
ch <- s.persistQueueCapacity
ch <- s.persistQueueLength
ch <- s.numSeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount
ch <- prometheus.MustNewConstMetric(
numMemChunksDesc,
prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)))
}
// chunkMaps is a slice of maps with chunkDescs to be persisted.
// Each chunk map contains n consecutive chunks to persist, where
// n is the index+1.
type chunkMaps []map[clientmodel.Fingerprint][]*chunkDesc
// add adds a chunk to chunkMaps.
func (cm *chunkMaps) add(fp clientmodel.Fingerprint, cd *chunkDesc) {
// Runtime of this method is linear with the number of
// chunkMaps. However, we expect only ever very few maps.
numMaps := len(*cm)
for i, m := range *cm {
if cds, ok := m[fp]; ok {
// Found our fp! Add cd and level up.
cds = append(cds, cd)
delete(m, fp)
if i == numMaps-1 {
*cm = append(*cm, map[clientmodel.Fingerprint][]*chunkDesc{})
}
(*cm)[i+1][fp] = cds
return
}
}
// Our fp isn't contained in cm yet. Add it to the first map (and add a
// first map if there is none).
if numMaps == 0 {
*cm = chunkMaps{map[clientmodel.Fingerprint][]*chunkDesc{}}
}
(*cm)[0][fp] = []*chunkDesc{cd}
}
// pop retrieves and removes a fingerprint with all its chunks. It chooses one
// of the fingerprints with the most chunks. It panics if cm has no entries.
func (cm *chunkMaps) pop() (clientmodel.Fingerprint, []*chunkDesc) {
m := (*cm)[len(*cm)-1]
for fp, cds := range m {
delete(m, fp)
// Prune empty maps from top level.
for len(m) == 0 {
*cm = (*cm)[:len(*cm)-1]
if len(*cm) == 0 {
break
}
m = (*cm)[len(*cm)-1]
}
return fp, cds
}
panic("popped from empty chunkMaps")
}