prometheus/storage/metric/tiered.go

599 lines
17 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"log"
"sort"
"sync"
"time"
)
// TieredStorage both persists samples and generates materialized views for
// queries.
type TieredStorage struct {
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
DiskStorage *LevelDBMetricPersistence
appendToDiskQueue chan model.Samples
appendToMemoryQueue chan model.Samples
diskFrontier *diskFrontier
draining chan chan bool
flushMemoryInterval time.Duration
memoryArena memorySeriesStorage
memoryTTL time.Duration
mutex sync.Mutex
viewQueue chan viewJob
writeMemoryInterval time.Duration
}
// viewJob encapsulates a request to extract sample values from the datastore.
type viewJob struct {
builder ViewRequestBuilder
output chan View
abort chan bool
err chan error
}
func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
diskStorage, err := NewLevelDBMetricPersistence(root)
if err != nil {
return
}
storage = &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth),
DiskStorage: diskStorage,
draining: make(chan chan bool),
flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(),
memoryTTL: memoryTTL,
viewQueue: make(chan viewJob, viewQueueDepth),
writeMemoryInterval: writeMemoryInterval,
}
return
}
// Enqueues Samples for storage.
func (t *TieredStorage) AppendSamples(s model.Samples) (err error) {
if len(t.draining) > 0 {
return fmt.Errorf("Storage is in the process of draining.")
}
t.appendToMemoryQueue <- s
return
}
// Stops the storage subsystem, flushing all pending operations.
func (t *TieredStorage) Drain() {
log.Println("Starting drain...")
drainingDone := make(chan bool)
if len(t.draining) == 0 {
t.draining <- drainingDone
}
<-drainingDone
log.Println("Done.")
}
// Enqueues a ViewRequestBuilder for materialization, subject to a timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) {
if len(t.draining) > 0 {
err = fmt.Errorf("Storage is in the process of draining.")
return
}
// The result channel needs a one-element buffer in case we have timed out in
// MakeView, but the view rendering still completes afterwards and writes to
// the channel.
result := make(chan View, 1)
// The abort channel needs a one-element buffer in case the view rendering
// has already exited and doesn't consume from the channel anymore.
abortChan := make(chan bool, 1)
errChan := make(chan error)
t.viewQueue <- viewJob{
builder: builder,
output: result,
abort: abortChan,
err: errChan,
}
select {
case value := <-result:
view = value
case err = <-errChan:
return
case <-time.After(deadline):
abortChan <- true
err = fmt.Errorf("MakeView timed out after %s.", deadline)
}
return
}
func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
return
}
return
}
// Starts serving requests.
func (t *TieredStorage) Serve() {
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
defer flushMemoryTicker.Stop()
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
defer writeMemoryTicker.Stop()
reportTicker := time.NewTicker(time.Second)
defer reportTicker.Stop()
go func() {
for _ = range reportTicker.C {
t.reportQueues()
}
}()
for {
select {
case <-writeMemoryTicker.C:
t.writeMemory()
case <-flushMemoryTicker.C:
t.flushMemory()
case viewRequest := <-t.viewQueue:
t.renderView(viewRequest)
case drainingDone := <-t.draining:
t.flush()
drainingDone <- true
return
}
}
}
func (t *TieredStorage) reportQueues() {
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "occupancy"}, float64(len(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "capacity"}, float64(cap(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue)))
}
func (t *TieredStorage) writeMemory() {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
pendingLength := len(t.appendToMemoryQueue)
for i := 0; i < pendingLength; i++ {
t.memoryArena.AppendSamples(<-t.appendToMemoryQueue)
}
}
func (t *TieredStorage) Flush() {
t.flush()
}
func (t *TieredStorage) Close() {
log.Println("Closing tiered storage...")
t.Drain()
t.DiskStorage.Close()
t.memoryArena.Close()
close(t.appendToDiskQueue)
close(t.appendToMemoryQueue)
close(t.viewQueue)
log.Println("Done.")
}
// Write all pending appends.
func (t *TieredStorage) flush() (err error) {
// Trim any old values to reduce iterative write costs.
t.flushMemory()
t.writeMemory()
t.flushMemory()
return
}
type memoryToDiskFlusher struct {
toDiskQueue chan model.Samples
disk MetricPersistence
olderThan time.Time
valuesAccepted int
valuesRejected int
}
type memoryToDiskFlusherVisitor struct {
stream stream
flusher *memoryToDiskFlusher
}
func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) {
out = time.Time(in.(skipListTime))
return
}
func (f memoryToDiskFlusherVisitor) DecodeValue(in interface{}) (out interface{}, err error) {
out = in.(value).get()
return
}
func (f memoryToDiskFlusherVisitor) Filter(key, value interface{}) (filterResult storage.FilterResult) {
var (
recordTime = key.(time.Time)
)
if recordTime.Before(f.flusher.olderThan) {
f.flusher.valuesAccepted++
return storage.ACCEPT
}
f.flusher.valuesRejected++
return storage.STOP
}
func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storage.OperatorError) {
var (
recordTime = key.(time.Time)
recordValue = value.(model.SampleValue)
)
if len(f.flusher.toDiskQueue) == cap(f.flusher.toDiskQueue) {
f.flusher.Flush()
}
f.flusher.toDiskQueue <- model.Samples{
model.Sample{
Metric: f.stream.metric,
Timestamp: recordTime,
Value: recordValue,
},
}
f.stream.values.Delete(skipListTime(recordTime))
return
}
func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) {
visitor := memoryToDiskFlusherVisitor{
stream: stream,
flusher: f,
}
return visitor, visitor, visitor
}
func (f *memoryToDiskFlusher) Flush() {
length := len(f.toDiskQueue)
samples := model.Samples{}
for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue...)
}
f.disk.AppendSamples(samples)
}
func (f memoryToDiskFlusher) Close() {
f.Flush()
}
// Persist a whole bunch of samples to the datastore.
func (t *TieredStorage) flushMemory() {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
flusher := &memoryToDiskFlusher{
disk: t.DiskStorage,
olderThan: time.Now().Add(-1 * t.memoryTTL),
toDiskQueue: t.appendToDiskQueue,
}
defer flusher.Close()
t.memoryArena.ForEachSample(flusher)
return
}
func (t *TieredStorage) renderView(viewJob viewJob) {
// Telemetry.
var err error
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
var (
scans = viewJob.builder.ScanJobs()
view = newView()
// Get a single iterator that will be used for all data extraction below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
)
defer iterator.Close()
// Rebuilding of the frontier should happen on a conditional basis if a
// (fingerprint, timestamp) tuple is outside of the current frontier.
err = t.rebuildDiskFrontier(iterator)
if err != nil {
panic(err)
}
for _, scanJob := range scans {
var seriesFrontier *seriesFrontier = nil
if t.diskFrontier != nil {
seriesFrontier, err = newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
if err != nil {
panic(err)
}
}
standingOps := scanJob.operations
for len(standingOps) > 0 {
// Abort the view rendering if the caller (MakeView) has timed out.
if len(viewJob.abort) > 0 {
return
}
// Load data value chunk(s) around the first standing op's current time.
targetTime := *standingOps[0].CurrentTime()
chunk := model.Values{}
memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime)
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
// XXX: For earnest performance gains analagous to the benchmarking we
// performed, chunk should only be reloaded if it no longer contains
// the values we're looking for.
//
// To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue.
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
chunk = append(latestDiskValue, memValues...)
} else {
chunk = diskValues
}
} else {
chunk = memValues
}
// There's no data at all for this fingerprint, so stop processing ops for it.
if len(chunk) == 0 {
break
}
chunk = chunk.TruncateBefore(targetTime)
lastChunkTime := chunk[len(chunk)-1].Timestamp
if lastChunkTime.After(targetTime) {
targetTime = lastChunkTime
}
// For each op, extract all needed data from the current chunk.
out := model.Values{}
for _, op := range standingOps {
if op.CurrentTime().After(targetTime) {
break
}
chunk = chunk.TruncateBefore(*(op.CurrentTime()))
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
out = op.ExtractSamples(chunk)
}
}
// Append the extracted samples to the materialized view.
for _, sample := range out {
view.appendSample(scanJob.fingerprint, sample.Timestamp, sample.Value)
}
// Throw away standing ops which are finished.
filteredOps := ops{}
for _, op := range standingOps {
if op.CurrentTime() != nil {
filteredOps = append(filteredOps, op)
}
}
standingOps = filteredOps
// Sort ops by start time again, since they might be slightly off now.
// For example, consider a current chunk of values and two interval ops
// with different interval lengths. Their states after the cycle above
// could be:
//
// (C = current op time)
//
// Chunk: [ X X X X X ]
// Op 1: [ X X C . . . ]
// Op 2: [ X X C . . .]
//
// Op 2 now has an earlier current time than Op 1.
sort.Sort(startsAtSort{standingOps})
}
}
viewJob.output <- view
return
}
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk model.Values) {
var (
targetKey = &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
}
foundKey model.SampleKey
foundValues model.Values
)
// Limit the target key to be within the series' keyspace.
if ts.After(frontier.lastSupertime) {
targetKey.Timestamp = indexable.EncodeTime(frontier.lastSupertime)
} else {
targetKey.Timestamp = indexable.EncodeTime(ts)
}
// Try seeking to target key.
rawKey, _ := coding.NewProtocolBuffer(targetKey).Encode()
iterator.Seek(rawKey)
foundKey, err := extractSampleKey(iterator)
if err != nil {
panic(err)
}
// Figure out if we need to rewind by one block.
// Imagine the following supertime blocks with time ranges:
//
// Block 1: ft 1000 - lt 1009 <data>
// Block 1: ft 1010 - lt 1019 <data>
//
// If we are aiming to find time 1005, we would first seek to the block with
// supertime 1010, then need to rewind by one block by virtue of LevelDB
// iterator seek behavior.
//
// Only do the rewind if there is another chunk before this one.
rewound := false
firstTime := foundKey.FirstTimestamp
if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) {
iterator.Previous()
rewound = true
}
foundValues, err = extractSampleValues(iterator)
if err != nil {
return
}
// If we rewound, but the target time is still past the current block, return
// the last value of the current (rewound) block and the entire next block.
if rewound {
foundKey, err = extractSampleKey(iterator)
if err != nil {
return
}
currentChunkLastTime := foundKey.LastTimestamp
if ts.After(currentChunkLastTime) {
sampleCount := len(foundValues)
chunk = append(chunk, foundValues[sampleCount-1])
// We know there's a next block since we have rewound from it.
iterator.Next()
foundValues, err = extractSampleValues(iterator)
if err != nil {
return
}
}
}
// Now append all the samples of the currently seeked block to the output.
chunk = append(chunk, foundValues...)
return
}
// Get all label values that are associated with the provided label name.
func (t *TieredStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {
diskValues, err := t.DiskStorage.GetAllValuesForLabel(labelName)
if err != nil {
return
}
memoryValues, err := t.memoryArena.GetAllValuesForLabel(labelName)
if err != nil {
return
}
valueSet := map[model.LabelValue]bool{}
for _, value := range append(diskValues, memoryValues...) {
if !valueSet[value] {
values = append(values, value)
valueSet[value] = true
}
}
return
}
// Get all of the metric fingerprints that are associated with the provided
// label set.
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fingerprints model.Fingerprints, err error) {
memFingerprints, err := t.memoryArena.GetFingerprintsForLabelSet(labelSet)
if err != nil {
return
}
diskFingerprints, err := t.DiskStorage.GetFingerprintsForLabelSet(labelSet)
if err != nil {
return
}
fingerprintSet := map[model.Fingerprint]bool{}
for _, fingerprint := range append(memFingerprints, diskFingerprints...) {
fingerprintSet[fingerprint] = true
}
for fingerprint := range fingerprintSet {
fingerprints = append(fingerprints, fingerprint)
}
return
}
// Get the metric associated with the provided fingerprint.
func (t *TieredStorage) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
m, err = t.memoryArena.GetMetricForFingerprint(f)
if err != nil {
return
}
if m == nil {
m, err = t.DiskStorage.GetMetricForFingerprint(f)
}
return
}