prometheus/storage/metric/curator.go
Matt T. Proud 4a87c002e8 Update low-level i'faces to reflect wireformats.
This commit fixes a critique of the old storage API design, whereby
the input parameters were always as raw bytes and never Protocol
Buffer messages that encapsulated the data, meaning every place a
read or mutation was conducted needed to manually perform said
translations on its own.  This is taxing.

Change-Id: I4786938d0d207cefb7782bd2bd96a517eead186f
2013-09-04 17:13:58 +02:00

487 lines
13 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"bytes"
"errors"
"fmt"
"strings"
"time"
"code.google.com/p/goprotobuf/proto"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
dto "github.com/prometheus/prometheus/model/generated"
)
const curationYieldPeriod = 250 * time.Millisecond
var errIllegalIterator = errors.New("Iterator invalid.")
// CurationStateUpdater receives updates about the curation state.
type CurationStateUpdater interface {
UpdateCurationState(*CurationState)
}
// CurationState contains high-level curation state information for the
// heads-up-display.
type CurationState struct {
Active bool
Name string
Limit time.Duration
Fingerprint *clientmodel.Fingerprint
}
type CuratorOptions struct {
// Stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain
// mode.
Stop chan bool
ViewQueue chan viewJob
}
// curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
type Curator struct {
stop chan bool
viewQueue chan viewJob
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
func NewCurator(o *CuratorOptions) *Curator {
return &Curator{
stop: o.Stop,
viewQueue: o.ViewQueue,
dtoSampleKeys: newDtoSampleKeyList(10),
sampleKeys: newSampleKeyList(10),
}
}
// watermarkScanner converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
// into (model.Fingerprint, model.Watermark) doubles.
//
// watermarkScanner determines whether to include or exclude candidate
// values from the curation process by virtue of how old the high watermark is.
//
// watermarkScanner scans over the curator.samples table for metrics whose
// high watermark has been determined to be allowable for curation. This type
// is individually responsible for compaction.
//
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
// forward until the stop point or end of the series is reached.
type watermarkScanner struct {
// curationState is the data store for curation remarks.
curationState CurationRemarker
// ignoreYoungerThan is passed into the curation remark for the given series.
ignoreYoungerThan time.Duration
// processor is responsible for executing a given stategy on the
// to-be-operated-on series.
processor Processor
// sampleIterator is a snapshotted iterator for the time series.
sampleIterator leveldb.Iterator
// samples
samples raw.Persistence
// stopAt is a cue for when to stop mutating a given series.
stopAt time.Time
// stop functions as the global stop channel for all future operations.
stop chan bool
// status is the outbound channel for notifying the status page of its state.
status CurationStateUpdater
firstBlock, lastBlock *SampleKey
ViewQueue chan viewJob
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
// run facilitates the curation lifecycle.
//
// recencyThreshold represents the most recent time up to which values will be
// curated.
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status CurationStateUpdater) (err error) {
defer func(t time.Time) {
duration := float64(time.Since(t) / time.Millisecond)
labels := map[string]string{
cutOff: fmt.Sprint(ignoreYoungerThan),
processorName: processor.Name(),
result: success,
}
if err != nil {
labels[result] = failure
}
curationDuration.IncrementBy(labels, duration)
curationDurations.Add(labels, duration)
}(time.Now())
defer status.UpdateCurationState(&CurationState{Active: false})
iterator, err := samples.NewIterator(true)
if err != nil {
return err
}
defer iterator.Close()
if !iterator.SeekToLast() {
glog.Info("Empty database; skipping curation.")
return
}
keyDto, _ := c.dtoSampleKeys.Get()
defer c.dtoSampleKeys.Give(keyDto)
lastBlock, _ := c.sampleKeys.Get()
defer c.sampleKeys.Give(lastBlock)
if err := iterator.Key(keyDto); err != nil {
panic(err)
}
lastBlock.Load(keyDto)
if !iterator.SeekToFirst() {
glog.Info("Empty database; skipping curation.")
return
}
firstBlock, _ := c.sampleKeys.Get()
defer c.sampleKeys.Give(firstBlock)
if err := iterator.Key(keyDto); err != nil {
panic(err)
}
firstBlock.Load(keyDto)
scanner := &watermarkScanner{
curationState: curationState,
ignoreYoungerThan: ignoreYoungerThan,
processor: processor,
status: status,
stop: c.stop,
stopAt: instant.Add(-1 * ignoreYoungerThan),
sampleIterator: iterator,
samples: samples,
firstBlock: firstBlock,
lastBlock: lastBlock,
ViewQueue: c.viewQueue,
dtoSampleKeys: c.dtoSampleKeys,
sampleKeys: c.sampleKeys,
}
// Right now, the ability to stop a curation is limited to the beginning of
// each fingerprint cycle. It is impractical to cease the work once it has
// begun for a given series.
_, err = watermarks.ForEach(scanner, scanner, scanner)
return
}
// drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies.
func (c *Curator) Drain() {
if len(c.stop) == 0 {
c.stop <- true
}
}
func (c *Curator) Close() {
c.dtoSampleKeys.Close()
c.sampleKeys.Close()
}
func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) {
key := new(dto.Fingerprint)
bytes := in.([]byte)
if err := proto.Unmarshal(bytes, key); err != nil {
return nil, err
}
fingerprint := new(clientmodel.Fingerprint)
loadFingerprint(fingerprint, key)
return fingerprint, nil
}
func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) {
value := new(dto.MetricHighWatermark)
bytes := in.([]byte)
if err := proto.Unmarshal(bytes, value); err != nil {
return nil, err
}
watermark := new(watermarks)
watermark.load(value)
return watermark, nil
}
func (w *watermarkScanner) shouldStop() bool {
return len(w.stop) != 0
}
func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) {
fingerprint := key.(*clientmodel.Fingerprint)
defer func() {
labels := map[string]string{
cutOff: fmt.Sprint(w.ignoreYoungerThan),
result: strings.ToLower(r.String()),
processorName: w.processor.Name(),
}
curationFilterOperations.Increment(labels)
w.status.UpdateCurationState(&CurationState{
Active: true,
Name: w.processor.Name(),
Limit: w.ignoreYoungerThan,
Fingerprint: fingerprint,
})
}()
if w.shouldStop() {
return storage.STOP
}
k := &curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}
curationRemark, present, err := w.curationState.Get(k)
if err != nil {
return
}
if !present {
return storage.ACCEPT
}
if !curationRemark.Before(w.stopAt) {
return storage.SKIP
}
watermark := value.(*watermarks)
if !curationRemark.Before(watermark.High) {
return storage.SKIP
}
curationConsistent, err := w.curationConsistent(fingerprint, watermark)
if err != nil {
return
}
if curationConsistent {
return storage.SKIP
}
return storage.ACCEPT
}
// curationConsistent determines whether the given metric is in a dirty state
// and needs curation.
func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, watermark *watermarks) (bool, error) {
k := &curationKey{
Fingerprint: f,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}
curationRemark, present, err := w.curationState.Get(k)
if err != nil {
return false, err
}
if !present {
return false, nil
}
if !curationRemark.Before(watermark.High) {
return true, nil
}
return false, nil
}
func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
fingerprint := key.(*clientmodel.Fingerprint)
glog.Infof("Curating %s...", fingerprint)
if len(w.ViewQueue) > 0 {
glog.Warning("Deferred due to view queue.")
time.Sleep(curationYieldPeriod)
}
if fingerprint.Less(w.firstBlock.Fingerprint) {
glog.Warning("Skipped since before keyspace.")
return nil
}
if w.lastBlock.Fingerprint.Less(fingerprint) {
glog.Warning("Skipped since after keyspace.")
return nil
}
curationState, _, err := w.curationState.Get(&curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
})
if err != nil {
glog.Warning("Unable to get curation state: %s", err)
// An anomaly with the curation remark is likely not fatal in the sense that
// there was a decoding error with the entity and shouldn't be cause to stop
// work. The process will simply start from a pessimistic work time and
// work forward. With an idempotent processor, this is safe.
return &storage.OperatorError{error: err, Continuable: true}
}
keySet, _ := w.sampleKeys.Get()
defer w.sampleKeys.Give(keySet)
keySet.Fingerprint = fingerprint
keySet.FirstTimestamp = curationState
// Invariant: The fingerprint tests above ensure that we have the same
// fingerprint.
keySet.Constrain(w.firstBlock, w.lastBlock)
seeker := &iteratorSeekerState{
i: w.sampleIterator,
obj: keySet,
first: w.firstBlock,
last: w.lastBlock,
dtoSampleKeys: w.dtoSampleKeys,
sampleKeys: w.sampleKeys,
}
for state := seeker.initialize; state != nil; state = state() {
}
if seeker.err != nil {
glog.Warningf("Got error in state machine: %s", seeker.err)
return &storage.OperatorError{error: seeker.err, Continuable: !seeker.iteratorInvalid}
}
if seeker.iteratorInvalid {
glog.Warningf("Got illegal iterator in state machine: %s", err)
return &storage.OperatorError{error: errIllegalIterator, Continuable: false}
}
if !seeker.seriesOperable {
return
}
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint)
if err != nil {
// We can't divine the severity of a processor error without refactoring the
// interface.
return &storage.OperatorError{error: err, Continuable: false}
}
if err = w.curationState.Update(&curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}, lastTime); err != nil {
// Under the assumption that the processors are idempotent, they can be
// re-run; thusly, the commitment of the curation remark is no cause
// to cease further progress.
return &storage.OperatorError{error: err, Continuable: true}
}
return nil
}
// curationKey provides a representation of dto.CurationKey with associated
// business logic methods attached to it to enhance code readability.
type curationKey struct {
Fingerprint *clientmodel.Fingerprint
ProcessorMessageRaw []byte
ProcessorMessageTypeName string
IgnoreYoungerThan time.Duration
}
// Equal answers whether the two curationKeys are equivalent.
func (c *curationKey) Equal(o *curationKey) bool {
switch {
case !c.Fingerprint.Equal(o.Fingerprint):
return false
case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0:
return false
case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName:
return false
case c.IgnoreYoungerThan != o.IgnoreYoungerThan:
return false
}
return true
}
func (c *curationKey) dump(d *dto.CurationKey) {
d.Reset()
// BUG(matt): Avenue for simplification.
fingerprintDTO := &dto.Fingerprint{}
dumpFingerprint(fingerprintDTO, c.Fingerprint)
d.Fingerprint = fingerprintDTO
d.ProcessorMessageRaw = c.ProcessorMessageRaw
d.ProcessorMessageTypeName = proto.String(c.ProcessorMessageTypeName)
d.IgnoreYoungerThan = proto.Int64(int64(c.IgnoreYoungerThan))
}
func (c *curationKey) load(d *dto.CurationKey) {
// BUG(matt): Avenue for simplification.
c.Fingerprint = &clientmodel.Fingerprint{}
loadFingerprint(c.Fingerprint, d.Fingerprint)
c.ProcessorMessageRaw = d.ProcessorMessageRaw
c.ProcessorMessageTypeName = d.GetProcessorMessageTypeName()
c.IgnoreYoungerThan = time.Duration(d.GetIgnoreYoungerThan())
}