prometheus/storage/local/storage.go
beorn7 1db7589081 Reduce the capacity of countPersistedHeadChunks.
The capacity is basically how many persisted head chunks we will count
at most while doing other things, in particular checkpointing. To
limit the amount of already counted head chunks, keep this number low,
otherwise we will easily checkpoint too often if checkpoints take long
anyway.
2015-02-27 00:53:52 +01:00

1084 lines
33 KiB
Go

// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package local contains the local time series storage used by Prometheus.
package local
import (
"container/list"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
const (
evictRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP.
fpMaxWaitDuration = 10 * time.Second
fpMinWaitDuration = 20 * time.Millisecond // A small multiple of disk seek time.
fpMaxSweepTime = 6 * time.Hour
maxEvictInterval = time.Minute
headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
appendWorkers = 8 // Should be enough to not make appending a bottleneck.
appendQueueCap = 2 * appendWorkers
)
type storageState uint
const (
storageStarting storageState = iota
storageServing
storageStopping
)
type persistRequest struct {
fingerprint clientmodel.Fingerprint
chunkDesc *chunkDesc
}
type evictRequest struct {
cd *chunkDesc
evict bool
}
type memorySeriesStorage struct {
fpLocker *fingerprintLocker
fpToSeries *seriesMap
loopStopping, loopStopped chan struct{}
maxMemoryChunks int
dropAfter time.Duration
checkpointInterval time.Duration
checkpointDirtySeriesLimit int
appendQueue chan *clientmodel.Sample
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
persistQueue chan persistRequest
persistQueueCap int // Not actually the cap of above channel. See handlePersistQueue.
persistStopped chan struct{}
persistence *persistence
countPersistedHeadChunks chan struct{}
evictList *list.List
evictRequests chan evictRequest
evictStopping, evictStopped chan struct{}
persistLatency prometheus.Summary
persistErrors prometheus.Counter
persistQueueCapacity prometheus.Metric
persistQueueLength prometheus.Gauge
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter
}
// MemorySeriesStorageOptions contains options needed by
// NewMemorySeriesStorage. It is not safe to leave any of those at their zero
// values.
type MemorySeriesStorageOptions struct {
MemoryChunks int // How many chunks to keep in memory.
PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup.
}
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
p, err := newPersistence(o.PersistenceStoragePath, chunkLen, o.Dirty)
if err != nil {
return nil, err
}
glog.Info("Loading series map and head chunks...")
fpToSeries, err := p.loadSeriesMapAndHeads()
if err != nil {
return nil, err
}
glog.Infof("%d series loaded.", fpToSeries.length())
numSeries := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
})
numSeries.Set(float64(fpToSeries.length()))
s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(1024),
fpToSeries: fpToSeries,
loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}),
maxMemoryChunks: o.MemoryChunks,
dropAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
// The actual buffering happens within handlePersistQueue, so
// cap of persistQueue just has to be enough to not block while
// handlePersistQueue is writing to disk (20ms or so).
persistQueue: make(chan persistRequest, 1024),
persistQueueCap: o.PersistenceQueueCapacity,
persistStopped: make(chan struct{}),
persistence: p,
countPersistedHeadChunks: make(chan struct{}, 100),
evictList: list.New(),
evictRequests: make(chan evictRequest, evictRequestsCap),
evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}),
persistLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_latency_microseconds",
Help: "A summary of latencies for persisting each chunk.",
}),
persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.",
}),
persistQueueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
"The total capacity of the persist queue.",
nil, nil,
),
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
),
persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_queue_length",
Help: "The current number of chunks waiting in the persist queue.",
}),
numSeries: numSeries,
seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_ops_total",
Help: "The total number of series operations by their type.",
},
[]string{opTypeLabel},
),
ingestedSamplesCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ingested_samples_total",
Help: "The total number of samples ingested.",
}),
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "invalid_preload_requests_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.",
}),
}
for i := 0; i < appendWorkers; i++ {
go func() {
for sample := range s.appendQueue {
s.appendSample(sample)
s.appendWaitGroup.Done()
}
}()
}
return s, nil
}
// Start implements Storage.
func (s *memorySeriesStorage) Start() {
go s.handleEvictList()
go s.handlePersistQueue()
go s.loop()
}
// Stop implements Storage.
func (s *memorySeriesStorage) Stop() error {
glog.Info("Stopping local storage...")
glog.Info("Draining append queue...")
close(s.appendQueue)
s.appendWaitGroup.Wait()
glog.Info("Append queue drained.")
glog.Info("Stopping maintenance loop...")
close(s.loopStopping)
<-s.loopStopped
glog.Info("Stopping persist queue...")
close(s.persistQueue)
<-s.persistStopped
glog.Info("Stopping chunk eviction...")
close(s.evictStopping)
<-s.evictStopped
// One final checkpoint of the series map and the head chunks.
if err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker); err != nil {
return err
}
if err := s.persistence.close(); err != nil {
return err
}
glog.Info("Local storage stopped.")
return nil
}
// WaitForIndexing implements Storage.
func (s *memorySeriesStorage) WaitForIndexing() {
// First let all goroutines appending samples stop.
s.appendWaitGroup.Wait()
// Only then wait for the persistence to index them.
s.persistence.waitForIndexing()
}
// NewIterator implements storage.
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
// Oops, no series for fp found. That happens if, after
// preloading is done, the whole series is identified as old
// enough for purging and hence purged for good. As there is no
// data left to iterate over, return an iterator that will never
// return any values.
return nopSeriesIterator{}
}
return series.newIterator(
func() { s.fpLocker.Lock(fp) },
func() { s.fpLocker.Unlock(fp) },
)
}
// NewPreloader implements Storage.
func (s *memorySeriesStorage) NewPreloader() Preloader {
return &memorySeriesPreloader{
storage: s,
}
}
// GetFingerprintsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers {
intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type {
case metric.Equal:
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 {
return nil
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
default:
values, err := s.persistence.getLabelValuesForLabelName(matcher.Name)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: v,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
}
}
if len(intersection) == 0 {
return nil
}
result = intersection
}
fps := make(clientmodel.Fingerprints, 0, len(result))
for fp := range result {
fps = append(fps, fp)
}
return fps
}
// GetLabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
lvs, err := s.persistence.getLabelValuesForLabelName(labelName)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", labelName, err)
}
return lvs
}
// GetMetricForFingerprint implements Storage.
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if ok {
// Wrap the returned metric in a copy-on-write (COW) metric here because
// the caller might mutate it.
return clientmodel.COWMetric{
Metric: series.metric,
}
}
metric, err := s.persistence.getArchivedMetric(fp)
if err != nil {
glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
}
return clientmodel.COWMetric{
Metric: metric,
}
}
// AppendSamples implements Storage.
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
for _, sample := range samples {
if sample.Timestamp != s.appendLastTimestamp {
// Timestamp has changed. We have to wait for processing
// of all appended samples before proceeding. Otherwise,
// we might violate the storage contract that each
// sample appended to a given series has to have a
// timestamp greater or equal to the previous sample
// appended to that series.
s.appendWaitGroup.Wait()
s.appendLastTimestamp = sample.Timestamp
}
s.appendWaitGroup.Add(1)
s.appendQueue <- sample
}
}
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
fp := sample.Metric.Fingerprint()
s.fpLocker.Lock(fp)
series := s.getOrCreateSeries(fp, sample.Metric)
chunkDescsToPersist := series.add(fp, &metric.SamplePair{
Value: sample.Value,
Timestamp: sample.Timestamp,
})
s.fpLocker.Unlock(fp)
s.ingestedSamplesCount.Inc()
if len(chunkDescsToPersist) == 0 {
return
}
// Queue only outside of the locked area, processing the persistQueue
// requires the same lock!
for _, cd := range chunkDescsToPersist {
s.persistQueue <- persistRequest{fp, cd}
}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
}
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
series, ok := s.fpToSeries.get(fp)
if !ok {
unarchived, firstTime, err := s.persistence.unarchiveMetric(fp)
if err != nil {
glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
}
if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc()
} else {
// This was a genuinely new series, so index the metric.
s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc()
}
series = newMemorySeries(m, !unarchived, firstTime)
s.fpToSeries.put(fp, series)
s.numSeries.Inc()
}
return series
}
func (s *memorySeriesStorage) preloadChunksForRange(
fp clientmodel.Fingerprint,
from clientmodel.Timestamp, through clientmodel.Timestamp,
stalenessDelta time.Duration,
) ([]*chunkDesc, error) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
return nil, err
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil, nil
}
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
metric, err := s.persistence.getArchivedMetric(fp)
if err != nil {
return nil, err
}
series = s.getOrCreateSeries(fp, metric)
} else {
return nil, nil
}
}
return series.preloadChunksForRange(from, through, fp, s)
}
func (s *memorySeriesStorage) handleEvictList() {
ticker := time.NewTicker(maxEvictInterval)
count := 0
for {
// To batch up evictions a bit, this tries evictions at least
// once per evict interval, but earlier if the number of evict
// requests with evict==true that have happened since the last
// evict run is more than maxMemoryChunks/1000.
select {
case req := <-s.evictRequests:
if req.evict {
req.cd.evictListElement = s.evictList.PushBack(req.cd)
count++
if count > s.maxMemoryChunks/1000 {
s.maybeEvict()
count = 0
}
} else {
if req.cd.evictListElement != nil {
s.evictList.Remove(req.cd.evictListElement)
req.cd.evictListElement = nil
}
}
case <-ticker.C:
if s.evictList.Len() > 0 {
s.maybeEvict()
}
case <-s.evictStopping:
// Drain evictRequests forever in a goroutine to not let
// requesters hang.
go func() {
for {
<-s.evictRequests
}
}()
ticker.Stop()
glog.Info("Chunk eviction stopped.")
close(s.evictStopped)
return
}
}
}
// maybeEvict is a local helper method. Must only be called by handleEvictList.
func (s *memorySeriesStorage) maybeEvict() {
numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks
if numChunksToEvict <= 0 {
return
}
chunkDescsToEvict := make([]*chunkDesc, numChunksToEvict)
for i := range chunkDescsToEvict {
e := s.evictList.Front()
if e == nil {
break
}
cd := e.Value.(*chunkDesc)
cd.evictListElement = nil
chunkDescsToEvict[i] = cd
s.evictList.Remove(e)
}
// Do the actual eviction in a goroutine as we might otherwise deadlock,
// in the following way: A chunk was unpinned completely and therefore
// scheduled for eviction. At the time we actually try to evict it,
// another goroutine is pinning the chunk. The pinning goroutine has
// currently locked the chunk and tries to send the evict request (to
// remove the chunk from the evict list) to the evictRequests
// channel. The send blocks because evictRequests is full. However, the
// goroutine that is supposed to empty the channel is waiting for the
// chunkDesc lock to try to evict the chunk.
go func() {
for _, cd := range chunkDescsToEvict {
if cd == nil {
break
}
cd.maybeEvict()
// We don't care if the eviction succeeds. If the chunk
// was pinned in the meantime, it will be added to the
// evict list once it gets unpinned again.
}
}()
}
func (s *memorySeriesStorage) handlePersistQueue() {
chunkMaps := chunkMaps{}
chunkCount := 0
persistMostConsecutiveChunks := func() {
fp, cds := chunkMaps.pop()
if err := s.persistChunks(fp, cds); err != nil {
// Need to put chunks back for retry.
for _, cd := range cds {
chunkMaps.add(fp, cd)
}
return
}
chunkCount -= len(cds)
}
loop:
for {
if chunkCount >= s.persistQueueCap && chunkCount > 0 {
glog.Warningf("%d chunks queued for persistence. Ingestion pipeline will backlog.", chunkCount)
persistMostConsecutiveChunks()
}
select {
case req, ok := <-s.persistQueue:
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
default:
if chunkCount > 0 {
persistMostConsecutiveChunks()
continue loop
}
// If we are here, there is nothing to do right now. So
// just wait for a persist request to come in.
req, ok := <-s.persistQueue
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
}
s.persistQueueLength.Set(float64(chunkCount))
}
// Drain all requests.
for _, m := range chunkMaps {
for fp, cds := range m {
if s.persistChunks(fp, cds) == nil {
chunkCount -= len(cds)
if (chunkCount+len(cds))/1000 > chunkCount/1000 {
glog.Infof(
"Still draining persist queue, %d chunks left to persist...",
chunkCount,
)
}
s.persistQueueLength.Set(float64(chunkCount))
}
}
}
glog.Info("Persist queue drained and stopped.")
close(s.persistStopped)
}
func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*chunkDesc) error {
start := time.Now()
chunks := make([]chunk, len(cds))
for i, cd := range cds {
chunks[i] = cd.chunk
}
s.fpLocker.Lock(fp)
offset, err := s.persistence.persistChunks(fp, chunks)
if series, seriesInMemory := s.fpToSeries.get(fp); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
s.fpLocker.Unlock(fp)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.Inc()
glog.Error("Error persisting chunks: ", err)
s.persistence.setDirty(true)
return err
}
for _, cd := range cds {
cd.unpin(s.evictRequests)
}
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
return nil
}
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
// to drop chunks after 40h, we want to cycle through all fingerprints within
// 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore,
// this method will always wait for at least fpMinWaitDuration and never longer
// than fpMaxWaitDuration. If s.loopStopped is closed, it will return false
// immediately. The estimation is based on the total number of fingerprints as
// passed in.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
d := fpMaxWaitDuration
if numberOfFPs != 0 {
sweepTime := s.dropAfter / 10
if sweepTime > fpMaxSweepTime {
sweepTime = fpMaxSweepTime
}
d = sweepTime / time.Duration(numberOfFPs)
if d < fpMinWaitDuration {
d = fpMinWaitDuration
}
if d > fpMaxWaitDuration {
d = fpMaxWaitDuration
}
}
t := time.NewTimer(d)
select {
case <-t.C:
return true
case <-s.loopStopping:
return false
}
}
// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for
// series in memory in a throttled fashion. It continues to cycle through all
// fingerprints in memory until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.Fingerprint {
memoryFingerprints := make(chan clientmodel.Fingerprint)
go func() {
var fpIter <-chan clientmodel.Fingerprint
defer func() {
if fpIter != nil {
for range fpIter {
// Consume the iterator.
}
}
close(memoryFingerprints)
}()
for {
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(s.fpToSeries.length()) {
return
}
begin := time.Now()
fpIter = s.fpToSeries.fpIter()
count := 0
for fp := range fpIter {
select {
case memoryFingerprints <- fp:
case <-s.loopStopping:
return
}
s.waitForNextFP(s.fpToSeries.length())
count++
}
if count > 0 {
glog.Infof(
"Completed maintenance sweep through %d in-memory fingerprints in %v.",
count, time.Since(begin),
)
}
}
}()
return memoryFingerprints
}
// cycleThroughArchivedFingerprints returns a channel that emits fingerprints
// for archived series in a throttled fashion. It continues to cycle through all
// archived fingerprints until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmodel.Fingerprint {
archivedFingerprints := make(chan clientmodel.Fingerprint)
go func() {
defer close(archivedFingerprints)
for {
archivedFPs, err := s.persistence.getFingerprintsModifiedBefore(
clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter),
)
if err != nil {
glog.Error("Failed to lookup archived fingerprint ranges: ", err)
s.waitForNextFP(0)
continue
}
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(len(archivedFPs)) {
return
}
begin := time.Now()
for _, fp := range archivedFPs {
select {
case archivedFingerprints <- fp:
case <-s.loopStopping:
return
}
s.waitForNextFP(len(archivedFPs))
}
if len(archivedFPs) > 0 {
glog.Infof(
"Completed maintenance sweep through %d archived fingerprints in %v.",
len(archivedFPs), time.Since(begin),
)
}
}
}()
return archivedFingerprints
}
func (s *memorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval)
// We take the number of head chunks persisted since the last checkpoint
// as an approximation for the number of series that are "dirty",
// i.e. whose head chunk is different from the one in the most recent
// checkpoint or for which the fact that the head chunk has been
// persisted is not reflected in the most recent checkpoint. This count
// could overestimate the number of dirty series, but it's good enough
// as a heuristic.
headChunksPersistedSinceLastCheckpoint := 0
defer func() {
checkpointTimer.Stop()
glog.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()
memoryFingerprints := s.cycleThroughMemoryFingerprints()
archivedFingerprints := s.cycleThroughArchivedFingerprints()
loop:
for {
select {
case <-s.loopStopping:
break loop
case <-checkpointTimer.C:
s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
headChunksPersistedSinceLastCheckpoint = 0
checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints:
s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
case fp := <-archivedFingerprints:
s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
case <-s.countPersistedHeadChunks:
headChunksPersistedSinceLastCheckpoint++
// Check if we have enough "dirty" series so that we need an early checkpoint.
// As described above, we take the headChunksPersistedSinceLastCheckpoint as a
// heuristic for "dirty" series. However, if we are already backlogging
// chunks to be persisted, creating a checkpoint would be counterproductive,
// as it would slow down chunk persisting even more, while in a situation like
// that, the best we can do for crash recovery is to work through the persist
// queue as quickly as possible. So only checkpoint if s.persistQueue is
// at most 20% full.
if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit &&
len(s.persistQueue) < cap(s.persistQueue)/5 {
checkpointTimer.Reset(0)
}
}
}
// Wait until both channels are closed.
for range memoryFingerprints {
}
for range archivedFingerprints {
}
}
// maintainMemorySeries first purges the series from old chunks. If the series
// still exists after that, it proceeds with the following steps: It closes the
// head chunk if it was not touched in a while. It archives a series if all
// chunks are evicted. It evicts chunkDescs if there are too many.
func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
var headChunkToPersist *chunkDesc
s.fpLocker.Lock(fp)
defer func() {
s.fpLocker.Unlock(fp)
// Queue outside of lock!
if headChunkToPersist != nil {
s.persistQueue <- persistRequest{fp, headChunkToPersist}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s.countPersistedHeadChunks <- struct{}{}: // Counted.
default: // Meh...
}
}
}()
series, ok := s.fpToSeries.get(fp)
if !ok {
// Series is actually not in memory, perhaps archived or dropped in the meantime.
return
}
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
if s.purgeMemorySeries(fp, series, beforeTime) {
// Series is gone now, we are done.
return
}
iOldestNotEvicted := -1
for i, cd := range series.chunkDescs {
if !cd.isEvicted() {
iOldestNotEvicted = i
break
}
}
// Archive if all chunks are evicted.
if iOldestNotEvicted == -1 {
s.fpToSeries.del(fp)
s.numSeries.Dec()
// Make sure we have a head chunk descriptor (a freshly
// unarchived series has none).
if len(series.chunkDescs) == 0 {
cds, err := s.loadChunkDescs(fp, clientmodel.Latest)
if err != nil {
glog.Errorf(
"Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v",
series.metric, err,
)
return
}
series.chunkDescs = cds
}
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", series.metric, err)
return
}
s.seriesOps.WithLabelValues(archive).Inc()
return
}
// If we are here, the series is not archived, so check for chunkDesc
// eviction next and then if the head chunk needs to be persisted.
series.evictChunkDescs(iOldestNotEvicted)
if !series.headChunkPersisted && time.Now().Sub(series.head().lastTime().Time()) > headChunkTimeout {
series.headChunkPersisted = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
series.headChunkUsedByIterator = false
headChunkToPersist = series.head()
}
}
// purgeMemorySeries drops chunks older than beforeTime from the provided memory
// series. The caller must have locked fp. If the series contains no chunks
// after dropping old chunks, it is purged entirely. In that case, the method
// returns true.
func (s *memorySeriesStorage) purgeMemorySeries(fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp) bool {
if !series.firstTime().Before(beforeTime) {
// Oldest sample not old enough.
return false
}
newFirstTime, numDroppedFromPersistence, allDroppedFromPersistence, err := s.persistence.dropChunks(fp, beforeTime)
if err != nil {
glog.Error("Error dropping persisted chunks: ", err)
}
numDroppedFromMemory, allDroppedFromMemory := series.dropChunks(beforeTime)
if allDroppedFromPersistence && allDroppedFromMemory {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.seriesOps.WithLabelValues(memoryPurge).Inc()
s.persistence.unindexMetric(fp, series.metric)
return true
}
if series.chunkDescsOffset != -1 {
series.savedFirstTime = newFirstTime
series.chunkDescsOffset += numDroppedFromMemory - numDroppedFromPersistence
if series.chunkDescsOffset < 0 {
panic("dropped more chunks from persistence than from memory")
}
}
return false
}
// maintainArchivedSeries drops chunks older than beforeTime from an archived
// series. If the series contains no chunks after that, it is purged entirely.
func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
glog.Error("Error looking up archived time range: ", err)
return
}
if !has || !firstTime.Before(beforeTime) {
// Oldest sample not old enough, or metric purged or unarchived in the meantime.
return
}
defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime)
if err != nil {
glog.Error("Error dropping persisted chunks: ", err)
}
if allDropped {
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
glog.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err)
return
}
s.seriesOps.WithLabelValues(archivePurge).Inc()
return
}
s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime)
}
// See persistence.loadChunks for detailed explanation.
func (s *memorySeriesStorage) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
return s.persistence.loadChunks(fp, indexes, indexOffset)
}
// See persistence.loadChunkDescs for detailed explanation.
func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) {
return s.persistence.loadChunkDescs(fp, beforeTime)
}
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)
ch <- s.persistLatency.Desc()
ch <- s.persistErrors.Desc()
ch <- s.persistQueueCapacity.Desc()
ch <- s.persistQueueLength.Desc()
ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc()
ch <- numMemChunksDesc
}
// Collect implements prometheus.Collector.
func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.persistence.Collect(ch)
ch <- s.persistLatency
ch <- s.persistErrors
ch <- s.persistQueueCapacity
ch <- s.persistQueueLength
ch <- s.numSeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount
ch <- prometheus.MustNewConstMetric(
numMemChunksDesc,
prometheus.GaugeValue,
float64(atomic.LoadInt64(&numMemChunks)))
}
// chunkMaps is a slice of maps with chunkDescs to be persisted.
// Each chunk map contains n consecutive chunks to persist, where
// n is the index+1.
type chunkMaps []map[clientmodel.Fingerprint][]*chunkDesc
// add adds a chunk to chunkMaps.
func (cm *chunkMaps) add(fp clientmodel.Fingerprint, cd *chunkDesc) {
// Runtime of this method is linear with the number of
// chunkMaps. However, we expect only ever very few maps.
numMaps := len(*cm)
for i, m := range *cm {
if cds, ok := m[fp]; ok {
// Found our fp! Add cd and level up.
cds = append(cds, cd)
delete(m, fp)
if i == numMaps-1 {
*cm = append(*cm, map[clientmodel.Fingerprint][]*chunkDesc{})
}
(*cm)[i+1][fp] = cds
return
}
}
// Our fp isn't contained in cm yet. Add it to the first map (and add a
// first map if there is none).
if numMaps == 0 {
*cm = chunkMaps{map[clientmodel.Fingerprint][]*chunkDesc{}}
}
(*cm)[0][fp] = []*chunkDesc{cd}
}
// pop retrieves and removes a fingerprint with all its chunks. It chooses one
// of the fingerprints with the most chunks. It panics if cm has no entries.
func (cm *chunkMaps) pop() (clientmodel.Fingerprint, []*chunkDesc) {
m := (*cm)[len(*cm)-1]
for fp, cds := range m {
delete(m, fp)
// Prune empty maps from top level.
for len(m) == 0 {
*cm = (*cm)[:len(*cm)-1]
if len(*cm) == 0 {
break
}
m = (*cm)[len(*cm)-1]
}
return fp, cds
}
panic("popped from empty chunkMaps")
}