Update low-level i'faces to reflect wireformats.

This commit fixes a critique of the old storage API design, whereby
the input parameters were always as raw bytes and never Protocol
Buffer messages that encapsulated the data, meaning every place a
read or mutation was conducted needed to manually perform said
translations on its own.  This is taxing.

Change-Id: I4786938d0d207cefb7782bd2bd96a517eead186f
This commit is contained in:
Matt T. Proud 2013-08-29 15:15:22 +02:00 committed by Matt T. Proud
parent 7910f6e863
commit 4a87c002e8
19 changed files with 1047 additions and 251 deletions

56
main.go
View file

@ -80,7 +80,7 @@ type prometheus struct {
tailCompactionTimer *time.Ticker
deletionTimer *time.Ticker
curationMutex sync.Mutex
curationSema chan bool
stopBackgroundOperations chan bool
unwrittenSamples chan *extraction.Result
@ -104,41 +104,62 @@ func (p *prometheus) interruptHandler() {
}
func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
p.curationMutex.Lock()
defer p.curationMutex.Unlock()
processor := &metric.CompactionProcessor{
MaximumMutationPoolBatch: groupSize * 3,
MinimumGroupSize: groupSize,
select {
case p.curationSema <- true:
default:
glog.Warningf("Deferred compaction for %s and %s due to existing operation.", operation, groupSize)
return
}
curator := metric.Curator{
defer func() {
<-p.curationSema
}()
processor := metric.NewCompactionProcessor(&metric.CompactionProcessorOptions{
MaximumMutationPoolBatch: groupSize * 3,
MinimumGroupSize: groupSize,
})
defer processor.Close()
curator := metric.NewCurator(&metric.CuratorOptions{
Stop: p.stopBackgroundOperations,
ViewQueue: p.storage.ViewQueue,
}
})
defer curator.Close()
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
}
func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
p.curationMutex.Lock()
defer p.curationMutex.Unlock()
processor := &metric.DeletionProcessor{
MaximumMutationPoolBatch: batchSize,
select {
case p.curationSema <- true:
default:
glog.Warningf("Deferred compaction for %s and %s due to existing operation.", operation, groupSize)
return
}
curator := metric.Curator{
processor := metric.NewDeletionProcessor(&metric.DeletionProcessorOptions{
MaximumMutationPoolBatch: batchSize,
})
defer processor.Close()
curator := metric.NewCurator(&metric.CuratorOptions{
Stop: p.stopBackgroundOperations,
ViewQueue: p.storage.ViewQueue,
}
})
defer curator.Close()
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
}
func (p *prometheus) close() {
select {
case p.curationSema <- true:
default:
}
if p.headCompactionTimer != nil {
p.headCompactionTimer.Stop()
}
@ -156,8 +177,6 @@ func (p *prometheus) close() {
p.stopBackgroundOperations <- true
}
p.curationMutex.Lock()
p.ruleManager.Stop()
p.storage.Close()
@ -267,6 +286,7 @@ func main() {
deletionTimer: deletionTimer,
curationState: prometheusStatus,
curationSema: make(chan bool, 1),
unwrittenSamples: unwrittenSamples,

View file

@ -15,6 +15,7 @@ package metric
import (
"bytes"
"errors"
"fmt"
"strings"
"time"
@ -24,7 +25,6 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
@ -34,6 +34,8 @@ import (
const curationYieldPeriod = 250 * time.Millisecond
var errIllegalIterator = errors.New("Iterator invalid.")
// CurationStateUpdater receives updates about the curation state.
type CurationStateUpdater interface {
UpdateCurationState(*CurationState)
@ -48,10 +50,7 @@ type CurationState struct {
Fingerprint *clientmodel.Fingerprint
}
// curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
type Curator struct {
type CuratorOptions struct {
// Stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain
// mode.
@ -60,6 +59,29 @@ type Curator struct {
ViewQueue chan viewJob
}
// curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
type Curator struct {
stop chan bool
viewQueue chan viewJob
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
func NewCurator(o *CuratorOptions) *Curator {
return &Curator{
stop: o.Stop,
viewQueue: o.ViewQueue,
dtoSampleKeys: newDtoSampleKeyList(10),
sampleKeys: newSampleKeyList(10),
}
}
// watermarkScanner converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
// into (model.Fingerprint, model.Watermark) doubles.
//
@ -95,6 +117,9 @@ type watermarkScanner struct {
firstBlock, lastBlock *SampleKey
ViewQueue chan viewJob
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
// run facilitates the curation lifecycle.
@ -122,7 +147,10 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
defer status.UpdateCurationState(&CurationState{Active: false})
iterator := samples.NewIterator(true)
iterator, err := samples.NewIterator(true)
if err != nil {
return err
}
defer iterator.Close()
if !iterator.SeekToLast() {
@ -130,21 +158,40 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
return
}
lastBlock, _ := extractSampleKey(iterator)
keyDto, _ := c.dtoSampleKeys.Get()
defer c.dtoSampleKeys.Give(keyDto)
lastBlock, _ := c.sampleKeys.Get()
defer c.sampleKeys.Give(lastBlock)
if err := iterator.Key(keyDto); err != nil {
panic(err)
}
lastBlock.Load(keyDto)
if !iterator.SeekToFirst() {
glog.Info("Empty database; skipping curation.")
return
}
firstBlock, _ := extractSampleKey(iterator)
firstBlock, _ := c.sampleKeys.Get()
defer c.sampleKeys.Give(firstBlock)
if err := iterator.Key(keyDto); err != nil {
panic(err)
}
firstBlock.Load(keyDto)
scanner := &watermarkScanner{
curationState: curationState,
ignoreYoungerThan: ignoreYoungerThan,
processor: processor,
status: status,
stop: c.Stop,
stop: c.stop,
stopAt: instant.Add(-1 * ignoreYoungerThan),
sampleIterator: iterator,
@ -153,7 +200,10 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
firstBlock: firstBlock,
lastBlock: lastBlock,
ViewQueue: c.ViewQueue,
ViewQueue: c.viewQueue,
dtoSampleKeys: c.dtoSampleKeys,
sampleKeys: c.sampleKeys,
}
// Right now, the ability to stop a curation is limited to the beginning of
@ -167,11 +217,16 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
// drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies.
func (c *Curator) Drain() {
if len(c.Stop) == 0 {
c.Stop <- true
if len(c.stop) == 0 {
c.stop <- true
}
}
func (c *Curator) Close() {
c.dtoSampleKeys.Close()
c.sampleKeys.Close()
}
func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) {
key := new(dto.Fingerprint)
bytes := in.([]byte)
@ -284,26 +339,32 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
}
func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
fingerprint := key.(*clientmodel.Fingerprint)
glog.Infof("Curating %s...", fingerprint)
if len(w.ViewQueue) > 0 {
glog.Warning("Deferred due to view queue.")
time.Sleep(curationYieldPeriod)
}
fingerprint := key.(*clientmodel.Fingerprint)
if fingerprint.Less(w.firstBlock.Fingerprint) {
glog.Warning("Skipped since before keyspace.")
return nil
}
if w.lastBlock.Fingerprint.Less(fingerprint) {
glog.Warning("Skipped since after keyspace.")
return nil
}
curationState, present, err := w.curationState.Get(&curationKey{
curationState, _, err := w.curationState.Get(&curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
})
if err != nil {
glog.Warning("Unable to get curation state: %s", err)
// An anomaly with the curation remark is likely not fatal in the sense that
// there was a decoding error with the entity and shouldn't be cause to stop
// work. The process will simply start from a pessimistic work time and
@ -311,47 +372,45 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: err, Continuable: true}
}
keySet := &SampleKey{
Fingerprint: fingerprint,
keySet, _ := w.sampleKeys.Get()
defer w.sampleKeys.Give(keySet)
keySet.Fingerprint = fingerprint
keySet.FirstTimestamp = curationState
// Invariant: The fingerprint tests above ensure that we have the same
// fingerprint.
keySet.Constrain(w.firstBlock, w.lastBlock)
seeker := &iteratorSeekerState{
i: w.sampleIterator,
obj: keySet,
first: w.firstBlock,
last: w.lastBlock,
dtoSampleKeys: w.dtoSampleKeys,
sampleKeys: w.sampleKeys,
}
if !present && fingerprint.Equal(w.firstBlock.Fingerprint) {
// If the fingerprint is the same, then we simply need to use the earliest
// block found in the database.
*keySet = *w.firstBlock
} else if present {
keySet.FirstTimestamp = curationState
for state := seeker.initialize; state != nil; state = state() {
}
dto := new(dto.SampleKey)
keySet.Dump(dto)
prospectiveKey := coding.NewPBEncoder(dto).MustEncode()
if !w.sampleIterator.Seek(prospectiveKey) {
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
// no work may occur, and the iterator cannot be recovered.
return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false}
if seeker.err != nil {
glog.Warningf("Got error in state machine: %s", seeker.err)
return &storage.OperatorError{error: seeker.err, Continuable: !seeker.iteratorInvalid}
}
for {
sampleKey, err := extractSampleKey(w.sampleIterator)
if err != nil {
return
}
if !sampleKey.Fingerprint.Equal(fingerprint) {
return
}
if seeker.iteratorInvalid {
glog.Warningf("Got illegal iterator in state machine: %s", err)
if !present {
break
}
return &storage.OperatorError{error: errIllegalIterator, Continuable: false}
}
if !(sampleKey.FirstTimestamp.Before(curationState) && sampleKey.LastTimestamp.Before(curationState)) {
break
}
if !w.sampleIterator.Next() {
return
}
if !seeker.seriesOperable {
return
}
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint)

182
storage/metric/freelist.go Normal file
View file

@ -0,0 +1,182 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"github.com/prometheus/prometheus/utility"
dto "github.com/prometheus/prometheus/model/generated"
)
type dtoSampleKeyList struct {
l utility.FreeList
}
func newDtoSampleKeyList(cap int) *dtoSampleKeyList {
return &dtoSampleKeyList{
l: utility.NewFreeList(cap),
}
}
func (l *dtoSampleKeyList) Get() (*dto.SampleKey, bool) {
if v, ok := l.l.Get(); ok {
return v.(*dto.SampleKey), ok
}
return new(dto.SampleKey), false
}
func (l *dtoSampleKeyList) Give(v *dto.SampleKey) bool {
v.Reset()
return l.l.Give(v)
}
func (l *dtoSampleKeyList) Close() {
l.l.Close()
}
type sampleKeyList struct {
l utility.FreeList
}
var defaultSampleKey = new(SampleKey)
func newSampleKeyList(cap int) *sampleKeyList {
return &sampleKeyList{
l: utility.NewFreeList(cap),
}
}
func (l *sampleKeyList) Get() (*SampleKey, bool) {
if v, ok := l.l.Get(); ok {
return v.(*SampleKey), ok
}
return new(SampleKey), false
}
func (l *sampleKeyList) Give(v *SampleKey) bool {
*v = *defaultSampleKey
return l.l.Give(v)
}
func (l *sampleKeyList) Close() {
l.l.Close()
}
type valueAtTimeList struct {
l utility.FreeList
}
func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValuesAtTimeOp), ok
}
return new(getValuesAtTimeOp), false
}
var pGetValuesAtTimeOp = new(getValuesAtTimeOp)
func (l *valueAtTimeList) Give(v *getValuesAtTimeOp) bool {
*v = *pGetValuesAtTimeOp
return l.l.Give(v)
}
func newValueAtTimeList(cap int) *valueAtTimeList {
return &valueAtTimeList{
l: utility.NewFreeList(cap),
}
}
type valueAtIntervalList struct {
l utility.FreeList
}
func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValuesAtIntervalOp), ok
}
return new(getValuesAtIntervalOp), false
}
var pGetValuesAtIntervalOp = new(getValuesAtIntervalOp)
func (l *valueAtIntervalList) Give(v *getValuesAtIntervalOp) bool {
*v = *pGetValuesAtIntervalOp
return l.l.Give(v)
}
func newValueAtIntervalList(cap int) *valueAtIntervalList {
return &valueAtIntervalList{
l: utility.NewFreeList(cap),
}
}
type valueAlongRangeList struct {
l utility.FreeList
}
func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValuesAlongRangeOp), ok
}
return new(getValuesAlongRangeOp), false
}
var pGetValuesAlongRangeOp = new(getValuesAlongRangeOp)
func (l *valueAlongRangeList) Give(v *getValuesAlongRangeOp) bool {
*v = *pGetValuesAlongRangeOp
return l.l.Give(v)
}
func newValueAlongRangeList(cap int) *valueAlongRangeList {
return &valueAlongRangeList{
l: utility.NewFreeList(cap),
}
}
type valueAtIntervalAlongRangeList struct {
l utility.FreeList
}
func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool) {
if v, ok := l.l.Get(); ok {
return v.(*getValueRangeAtIntervalOp), ok
}
return new(getValueRangeAtIntervalOp), false
}
var pGetValueRangeAtIntervalOp = new(getValueRangeAtIntervalOp)
func (l *valueAtIntervalAlongRangeList) Give(v *getValueRangeAtIntervalOp) bool {
*v = *pGetValueRangeAtIntervalOp
return l.l.Give(v)
}
func newValueAtIntervalAlongRangeList(cap int) *valueAtIntervalAlongRangeList {
return &valueAtIntervalAlongRangeList{
l: utility.NewFreeList(cap),
}
}

View file

@ -402,8 +402,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) {
k := &dto.SampleKey{}
err := proto.Unmarshal(i.Key(), k)
if err != nil {
if err := i.Key(k); err != nil {
return nil, err
}
@ -415,8 +414,7 @@ func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) {
func extractSampleValues(i leveldb.Iterator) (Values, error) {
v := &dto.SampleValueSeries{}
err := proto.Unmarshal(i.Value(), v)
if err != nil {
if err := i.Value(v); err != nil {
return nil, err
}

212
storage/metric/objective.go Normal file
View file

@ -0,0 +1,212 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
"github.com/golang/glog"
"github.com/prometheus/prometheus/storage/raw/leveldb"
dto "github.com/prometheus/prometheus/model/generated"
)
type iteratorSeekerState struct {
// Immutable State
i leveldb.Iterator
obj *SampleKey
first, last *SampleKey
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
// Mutable State
iteratorInvalid bool
seriesOperable bool
err error
key *SampleKey
keyDto *dto.SampleKey
}
// iteratorSeeker is a function that models a state machine state and
// is responsible for choosing the subsequent state given the present
// disposition.
//
// It returns the next state or nil if no remaining transition is possible.
// It returns an error if one occurred and finally a truth value indicating
// whether the current iterator state is usable and whether it can proceed with
// the current fingerprint.
type iteratorSeeker func() iteratorSeeker
func (s *iteratorSeekerState) initialize() iteratorSeeker {
s.key, _ = s.sampleKeys.Get()
s.keyDto, _ = s.dtoSampleKeys.Get()
return s.start
}
func (s *iteratorSeekerState) destroy() iteratorSeeker {
s.sampleKeys.Give(s.key)
s.dtoSampleKeys.Give(s.keyDto)
return nil
}
func (s *iteratorSeekerState) start() iteratorSeeker {
switch {
case s.obj.Fingerprint.Less(s.first.Fingerprint):
// The fingerprint does not exist in the database.
return s.destroy
case s.last.Fingerprint.Less(s.obj.Fingerprint):
// The fingerprint does not exist in the database.
return s.destroy
case s.obj.Fingerprint.Equal(s.first.Fingerprint) && s.obj.FirstTimestamp.Before(s.first.FirstTimestamp):
// The fingerprint is the first fingerprint, but we've requested a value
// before what exists in the database.
return s.seekBeginning
case s.last.Before(s.obj.Fingerprint, s.obj.FirstTimestamp):
// The requested time for work is after the last sample in the database; we
// can't do anything!
return s.destroy
default:
return s.initialSeek
}
}
func (s *iteratorSeekerState) seekBeginning() iteratorSeeker {
s.i.SeekToFirst()
if !s.i.Valid() {
s.err = s.i.Error()
// If we can't seek to the beginning, there isn't any hope for us.
glog.Warning("iterator went bad: %s", s.err)
s.iteratorInvalid = true
return s.destroy
}
return s.initialMatchFingerprint
}
func (s *iteratorSeekerState) initialSeek() iteratorSeeker {
s.obj.Dump(s.keyDto)
s.i.Seek(s.keyDto)
if !s.i.Valid() {
s.err = s.i.Error()
glog.Warningf("iterator went bad %s", s.err)
s.iteratorInvalid = true
return s.destroy
}
return s.initialMatchFingerprint
}
func (s *iteratorSeekerState) initialMatchFingerprint() iteratorSeeker {
if err := s.i.Key(s.keyDto); err != nil {
s.err = err
return s.destroy
}
s.key.Load(s.keyDto)
switch {
case s.obj.Fingerprint.Less(s.key.Fingerprint):
return s.initialFingerprintOvershot
case s.key.Fingerprint.Less(s.obj.Fingerprint):
panic("violated invariant")
default:
return s.initialMatchTime
}
}
func (s *iteratorSeekerState) initialFingerprintOvershot() iteratorSeeker {
s.i.Previous()
if !s.i.Valid() {
glog.Warningf("Could not backtrack for %s", s)
panic("violated invariant")
}
if err := s.i.Key(s.keyDto); err != nil {
s.err = err
return s.destroy
}
s.key.Load(s.keyDto)
if !s.key.Fingerprint.Equal(s.obj.Fingerprint) {
return s.destroy
}
return s.initialMatchTime
}
func (s *iteratorSeekerState) initialMatchTime() iteratorSeeker {
switch {
case s.key.MayContain(s.obj.FirstTimestamp):
s.seriesOperable = true
return s.destroy
case s.key.Equal(s.first), s.obj.FirstTimestamp.Equal(s.key.FirstTimestamp):
s.seriesOperable = true
return s.destroy
case s.obj.FirstTimestamp.Before(s.key.FirstTimestamp):
return s.reCue
default:
panic("violated invariant " + fmt.Sprintln(s.obj, s.key))
}
}
func (s *iteratorSeekerState) reCue() iteratorSeeker {
s.i.Previous()
if !s.i.Valid() {
glog.Warningf("Could not backtrack for %s", s)
panic("violated invariant")
}
if err := s.i.Key(s.keyDto); err != nil {
s.err = err
return s.destroy
}
s.key.Load(s.keyDto)
if !s.key.Fingerprint.Equal(s.obj.Fingerprint) {
return s.fastForward
}
s.seriesOperable = true
return s.destroy
}
func (s *iteratorSeekerState) fastForward() iteratorSeeker {
s.i.Next()
if !s.i.Valid() {
glog.Warningf("Could not fast-forward for %s", s)
panic("violated invariant")
}
s.seriesOperable = true
return s.destroy
}

View file

@ -50,18 +50,14 @@ type Processor interface {
// CompactionProcessor combines sparse values in the database together such
// that at least MinimumGroupSize-sized chunks are grouped together.
type CompactionProcessor struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
//
// A reasonable value would be (MinimumGroupSize * 2) + 1.
MaximumMutationPoolBatch int
// MinimumGroupSize represents the smallest allowed sample chunk size in the
// database.
MinimumGroupSize int
maximumMutationPoolBatch int
minimumGroupSize int
// signature is the byte representation of the CompactionProcessor's settings,
// used for purely memoization purposes across an instance.
signature []byte
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
func (p *CompactionProcessor) Name() string {
@ -71,7 +67,7 @@ func (p *CompactionProcessor) Name() string {
func (p *CompactionProcessor) Signature() []byte {
if len(p.signature) == 0 {
out, err := proto.Marshal(&dto.CompactionProcessorDefinition{
MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)),
MinimumGroupSize: proto.Uint32(uint32(p.minimumGroupSize)),
})
if err != nil {
panic(err)
@ -84,7 +80,7 @@ func (p *CompactionProcessor) Signature() []byte {
}
func (p *CompactionProcessor) String() string {
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.MinimumGroupSize)
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize)
}
func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *clientmodel.Fingerprint) (lastCurated time.Time, err error) {
@ -98,16 +94,22 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
var pendingMutations = 0
var pendingSamples Values
var sampleKey *SampleKey
var unactedSamples Values
var lastTouchedTime time.Time
var keyDropped bool
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
sampleKey, _ := p.sampleKeys.Get()
defer p.sampleKeys.Give(sampleKey)
sampleKeyDto, _ := p.dtoSampleKeys.Get()
defer p.dtoSampleKeys.Give(sampleKeyDto)
if err = sampleIterator.Key(sampleKeyDto); err != nil {
return
}
sampleKey.Load(sampleKeyDto)
unactedSamples, err = extractSampleValues(sampleIterator)
if err != nil {
return
@ -129,10 +131,11 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
keyDropped = false
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
if err = sampleIterator.Key(sampleKeyDto); err != nil {
return
}
sampleKey.Load(sampleKeyDto)
unactedSamples, err = extractSampleValues(sampleIterator)
if err != nil {
return
@ -141,7 +144,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
// If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
case pendingMutations >= p.MaximumMutationPoolBatch:
case pendingMutations >= p.maximumMutationPoolBatch:
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
@ -152,11 +155,11 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
pendingBatch.Close()
pendingBatch = nil
case len(pendingSamples) == 0 && len(unactedSamples) >= p.MinimumGroupSize:
case len(pendingSamples) == 0 && len(unactedSamples) >= p.minimumGroupSize:
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
unactedSamples = Values{}
case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize:
case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize:
if !keyDropped {
k := new(dto.SampleKey)
sampleKey.Dump(k)
@ -170,7 +173,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
pendingMutations++
// If the number of pending writes equals the target group size
case len(pendingSamples) == p.MinimumGroupSize:
case len(pendingSamples) == p.minimumGroupSize:
k := new(dto.SampleKey)
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey.Dump(k)
@ -187,9 +190,9 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
keyDropped = true
}
if len(unactedSamples) > p.MinimumGroupSize {
pendingSamples = unactedSamples[:p.MinimumGroupSize]
unactedSamples = unactedSamples[p.MinimumGroupSize:]
if len(unactedSamples) > p.minimumGroupSize {
pendingSamples = unactedSamples[:p.minimumGroupSize]
unactedSamples = unactedSamples[p.minimumGroupSize:]
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
} else {
pendingSamples = unactedSamples
@ -198,14 +201,14 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
}
}
case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize:
case len(pendingSamples)+len(unactedSamples) >= p.minimumGroupSize:
if !keyDropped {
k := new(dto.SampleKey)
sampleKey.Dump(k)
pendingBatch.Drop(k)
keyDropped = true
}
remainder := p.MinimumGroupSize - len(pendingSamples)
remainder := p.minimumGroupSize - len(pendingSamples)
pendingSamples = append(pendingSamples, unactedSamples[:remainder]...)
unactedSamples = unactedSamples[remainder:]
if len(unactedSamples) == 0 {
@ -244,15 +247,42 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
return
}
// DeletionProcessor deletes sample blocks older than a defined value.
type DeletionProcessor struct {
func (p *CompactionProcessor) Close() {
p.dtoSampleKeys.Close()
p.sampleKeys.Close()
}
type CompactionProcessorOptions struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
//
// A reasonable value would be (MinimumGroupSize * 2) + 1.
MaximumMutationPoolBatch int
// MinimumGroupSize represents the smallest allowed sample chunk size in the
// database.
MinimumGroupSize int
}
func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor {
return &CompactionProcessor{
maximumMutationPoolBatch: o.MaximumMutationPoolBatch,
minimumGroupSize: o.MinimumGroupSize,
dtoSampleKeys: newDtoSampleKeyList(10),
sampleKeys: newSampleKeyList(10),
}
}
// DeletionProcessor deletes sample blocks older than a defined value.
type DeletionProcessor struct {
maximumMutationPoolBatch int
// signature is the byte representation of the DeletionProcessor's settings,
// used for purely memoization purposes across an instance.
signature []byte
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
func (p *DeletionProcessor) Name() string {
@ -286,10 +316,16 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
}
}()
sampleKey, err := extractSampleKey(sampleIterator)
if err != nil {
sampleKeyDto, _ := p.dtoSampleKeys.Get()
defer p.dtoSampleKeys.Give(sampleKeyDto)
sampleKey, _ := p.sampleKeys.Get()
defer p.sampleKeys.Give(sampleKey)
if err = sampleIterator.Key(sampleKeyDto); err != nil {
return
}
sampleKey.Load(sampleKeyDto)
sampleValues, err := extractSampleValues(sampleIterator)
if err != nil {
@ -312,10 +348,11 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
}
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
if err = sampleIterator.Key(sampleKeyDto); err != nil {
return
}
sampleKey.Load(sampleKeyDto)
sampleValues, err = extractSampleValues(sampleIterator)
if err != nil {
return
@ -324,7 +361,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
// If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
case pendingMutations >= p.MaximumMutationPoolBatch:
case pendingMutations >= p.maximumMutationPoolBatch:
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
@ -379,3 +416,24 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
return
}
func (p *DeletionProcessor) Close() {
p.dtoSampleKeys.Close()
p.sampleKeys.Close()
}
type DeletionProcessorOptions struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
MaximumMutationPoolBatch int
}
func NewDeletionProcessor(o *DeletionProcessorOptions) *DeletionProcessor {
return &DeletionProcessor{
maximumMutationPoolBatch: o.MaximumMutationPoolBatch,
dtoSampleKeys: newDtoSampleKeyList(10),
sampleKeys: newSampleKeyList(10),
}
}

View file

@ -123,10 +123,10 @@ func TestCuratorCompactionProcessor(t *testing.T) {
}{
{
in: in{
processor: &CompactionProcessor{
processor: NewCompactionProcessor(&CompactionProcessorOptions{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
}),
ignoreYoungerThan: 1 * time.Hour,
groupSize: 5,
curationStates: fixture.Pairs{
@ -134,27 +134,27 @@ func TestCuratorCompactionProcessor(t *testing.T) {
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &CompactionProcessor{
processor: NewCompactionProcessor(&CompactionProcessorOptions{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
}),
},
curationState{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &CompactionProcessor{
processor: NewCompactionProcessor(&CompactionProcessorOptions{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
}),
},
// This rule should effectively be ignored.
curationState{
fingerprint: "0002-A-2-Z",
processor: &CompactionProcessor{
processor: NewCompactionProcessor(&CompactionProcessorOptions{
MinimumGroupSize: 2,
MaximumMutationPoolBatch: 15,
},
}),
ignoreYoungerThan: 30 * time.Minute,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
},
@ -553,28 +553,28 @@ func TestCuratorCompactionProcessor(t *testing.T) {
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &CompactionProcessor{
processor: NewCompactionProcessor(&CompactionProcessorOptions{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
}),
},
{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 30 * time.Minute,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &CompactionProcessor{
processor: NewCompactionProcessor(&CompactionProcessorOptions{
MinimumGroupSize: 2,
MaximumMutationPoolBatch: 15,
},
}),
},
{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: time.Hour,
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
processor: &CompactionProcessor{
processor: NewCompactionProcessor(&CompactionProcessorOptions{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
}),
},
},
sampleGroups: []sampleGroup{
@ -881,16 +881,20 @@ func TestCuratorCompactionProcessor(t *testing.T) {
stop := make(chan bool)
defer close(stop)
c := Curator{
c := NewCurator(&CuratorOptions{
Stop: stop,
}
})
defer c.Close()
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
if err != nil {
t.Fatal(err)
}
iterator := curatorStates.p.NewIterator(true)
iterator, err := curatorStates.p.NewIterator(true)
if err != nil {
t.Fatal(err)
}
defer iterator.Close()
for j, expected := range scenario.out.curationStates {
@ -905,9 +909,8 @@ func TestCuratorCompactionProcessor(t *testing.T) {
}
}
curationKeyDto := &dto.CurationKey{}
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
curationKeyDto := new(dto.CurationKey)
err = iterator.Key(curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
@ -938,7 +941,10 @@ func TestCuratorCompactionProcessor(t *testing.T) {
}
}
iterator = samples.NewIterator(true)
iterator, err = samples.NewIterator(true)
if err != nil {
t.Fatal(err)
}
defer iterator.Close()
for j, expected := range scenario.out.sampleGroups {
@ -1004,9 +1010,9 @@ func TestCuratorDeletionProcessor(t *testing.T) {
}{
{
in: in{
processor: &DeletionProcessor{
processor: NewDeletionProcessor(&DeletionProcessorOptions{
MaximumMutationPoolBatch: 15,
},
}),
ignoreYoungerThan: 1 * time.Hour,
groupSize: 5,
curationStates: fixture.Pairs{
@ -1014,17 +1020,17 @@ func TestCuratorDeletionProcessor(t *testing.T) {
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &DeletionProcessor{
processor: NewDeletionProcessor(&DeletionProcessorOptions{
MaximumMutationPoolBatch: 15,
},
}),
},
curationState{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &DeletionProcessor{
processor: NewDeletionProcessor(&DeletionProcessorOptions{
MaximumMutationPoolBatch: 15,
},
}),
},
},
watermarkStates: fixture.Pairs{
@ -1317,17 +1323,17 @@ func TestCuratorDeletionProcessor(t *testing.T) {
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &DeletionProcessor{
processor: NewDeletionProcessor(&DeletionProcessorOptions{
MaximumMutationPoolBatch: 15,
},
}),
},
{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
processor: &DeletionProcessor{
processor: NewDeletionProcessor(&DeletionProcessorOptions{
MaximumMutationPoolBatch: 15,
},
}),
},
},
sampleGroups: []sampleGroup{
@ -1404,16 +1410,20 @@ func TestCuratorDeletionProcessor(t *testing.T) {
stop := make(chan bool)
defer close(stop)
c := Curator{
c := NewCurator(&CuratorOptions{
Stop: stop,
}
})
defer c.Close()
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
if err != nil {
t.Fatal(err)
}
iterator := curatorStates.p.NewIterator(true)
iterator, err := curatorStates.p.NewIterator(true)
if err != nil {
t.Fatal(err)
}
defer iterator.Close()
for j, expected := range scenario.out.curationStates {
@ -1429,9 +1439,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
}
curationKeyDto := new(dto.CurationKey)
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil {
if err := iterator.Key(curationKeyDto); err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
@ -1463,7 +1471,10 @@ func TestCuratorDeletionProcessor(t *testing.T) {
}
}
iterator = samples.NewIterator(true)
iterator, err = samples.NewIterator(true)
if err != nil {
t.Fatal(err)
}
defer iterator.Close()
for j, expected := range scenario.out.sampleGroups {

View file

@ -35,6 +35,21 @@ type SampleKey struct {
SampleCount uint32
}
// Constrain merges the underlying SampleKey to fit within the keyspace of
// the provided first and last keys and returns whether the key was modified.
func (s *SampleKey) Constrain(first, last *SampleKey) bool {
switch {
case s.Before(first.Fingerprint, first.FirstTimestamp):
*s = *first
return true
case last.Before(s.Fingerprint, s.FirstTimestamp):
*s = *last
return true
default:
return false
}
}
func (s *SampleKey) Equal(o *SampleKey) bool {
if s == o {
return true

View file

@ -24,7 +24,6 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/utility/test"
@ -198,12 +197,13 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *clientmodel.Fingerpr
Timestamp: indexable.EncodeTime(i.OldestInclusive),
}
e := coding.NewPBEncoder(k).MustEncode()
iterator := l.MetricSamples.NewIterator(true)
iterator, err := l.MetricSamples.NewIterator(true)
if err != nil {
panic(err)
}
defer iterator.Close()
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
for valid := iterator.Seek(k); valid; valid = iterator.Next() {
retrievedKey, err := extractSampleKey(iterator)
if err != nil {
return samples, err

View file

@ -23,12 +23,9 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
dto "github.com/prometheus/prometheus/model/generated"
)
type chunk Values
@ -95,6 +92,9 @@ type TieredStorage struct {
Indexer MetricIndexer
flushSema chan bool
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
// viewJob encapsulates a request to extract sample values from the datastore.
@ -140,6 +140,9 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
wmCache: wmCache,
flushSema: make(chan bool, 1),
dtoSampleKeys: newDtoSampleKeyList(10),
sampleKeys: newSampleKeyList(10),
}
for i := 0; i < tieredMemorySemaphores; i++ {
@ -323,6 +326,9 @@ func (t *TieredStorage) close() {
close(t.ViewQueue)
t.wmCache.Clear()
t.dtoSampleKeys.Close()
t.sampleKeys.Close()
t.state = tieredStorageStopping
}
@ -379,7 +385,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
var iterator leveldb.Iterator
diskPresent := true
var firstBlock, lastBlock *SampleKey
firstBlock, _ := t.sampleKeys.Get()
defer t.sampleKeys.Give(firstBlock)
lastBlock, _ := t.sampleKeys.Get()
defer t.sampleKeys.Give(lastBlock)
sampleKeyDto, _ := t.dtoSampleKeys.Get()
defer t.dtoSampleKeys.Give(sampleKeyDto)
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for _, scanJob := range scans {
@ -410,14 +424,23 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
if iterator == nil {
// Get a single iterator that will be used for all data extraction
// below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
if diskPresent = iterator.SeekToLast(); diskPresent {
lastBlock, _ = extractSampleKey(iterator)
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
lastBlock.Load(sampleKeyDto)
if !iterator.SeekToFirst() {
diskPresent = false
} else {
firstBlock, _ = extractSampleKey(iterator)
if err := iterator.Key(sampleKeyDto); err != nil {
panic(err)
}
firstBlock.Load(sampleKeyDto)
}
}
}
@ -482,7 +505,10 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
for _, op := range standingOps {
if !op.Consumed() {
filteredOps = append(filteredOps, op)
continue
}
giveBackOp(op)
}
standingOps = filteredOps
@ -515,9 +541,10 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
return nil, true
}
seekingKey := &SampleKey{
Fingerprint: fingerprint,
}
seekingKey, _ := t.sampleKeys.Get()
defer t.sampleKeys.Give(seekingKey)
seekingKey.Fingerprint = fingerprint
if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) {
seekingKey.FirstTimestamp = firstBlock.FirstTimestamp
@ -527,21 +554,22 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
seekingKey.FirstTimestamp = ts
}
fd := new(dto.SampleKey)
seekingKey.Dump(fd)
dto, _ := t.dtoSampleKeys.Get()
defer t.dtoSampleKeys.Give(dto)
// Try seeking to target key.
rawKey := coding.NewPBEncoder(fd).MustEncode()
if !iterator.Seek(rawKey) {
seekingKey.Dump(dto)
if !iterator.Seek(dto) {
return chunk, true
}
var foundKey *SampleKey
var foundValues Values
foundKey, _ = extractSampleKey(iterator)
if err := iterator.Key(dto); err != nil {
panic(err)
}
seekingKey.Load(dto)
if foundKey.Fingerprint.Equal(fingerprint) {
if seekingKey.Fingerprint.Equal(fingerprint) {
// Figure out if we need to rewind by one block.
// Imagine the following supertime blocks with time ranges:
//
@ -553,16 +581,19 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
// iterator seek behavior.
//
// Only do the rewind if there is another chunk before this one.
if !foundKey.MayContain(ts) {
if !seekingKey.MayContain(ts) {
postValues, _ := extractSampleValues(iterator)
if !foundKey.Equal(firstBlock) {
if !seekingKey.Equal(firstBlock) {
if !iterator.Previous() {
panic("This should never return false.")
}
foundKey, _ = extractSampleKey(iterator)
if err := iterator.Key(dto); err != nil {
panic(err)
}
seekingKey.Load(dto)
if !foundKey.Fingerprint.Equal(fingerprint) {
if !seekingKey.Fingerprint.Equal(fingerprint) {
return postValues, false
}
@ -576,15 +607,18 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
return foundValues, false
}
if fingerprint.Less(foundKey.Fingerprint) {
if !foundKey.Equal(firstBlock) {
if fingerprint.Less(seekingKey.Fingerprint) {
if !seekingKey.Equal(firstBlock) {
if !iterator.Previous() {
panic("This should never return false.")
}
foundKey, _ = extractSampleKey(iterator)
if err := iterator.Key(dto); err != nil {
panic(err)
}
seekingKey.Load(dto)
if !foundKey.Fingerprint.Equal(fingerprint) {
if !seekingKey.Fingerprint.Equal(fingerprint) {
return nil, false
}

View file

@ -48,40 +48,48 @@ func NewViewRequestBuilder() *viewRequestBuilder {
}
}
var getValuesAtTimes = newValueAtTimeList(10 * 1024)
// Gets for the given Fingerprint either the value at that time if there is an
// match or the one or two values adjacent thereto.
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAtTimeOp{
time: time,
})
op, _ := getValuesAtTimes.Get()
op.time = time
ops = append(ops, op)
v.operations[*fingerprint] = ops
}
var getValuesAtIntervals = newValueAtIntervalList(10 * 1024)
// Gets for the given Fingerprint either the value at that interval from From
// through Through if there is an match or the one or two values adjacent
// for each point.
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAtIntervalOp{
from: from,
through: through,
interval: interval,
})
op, _ := getValuesAtIntervals.Get()
op.from = from
op.through = through
op.interval = interval
ops = append(ops, op)
v.operations[*fingerprint] = ops
}
var getValuesAlongRanges = newValueAlongRangeList(10 * 1024)
// Gets for the given Fingerprint the values that occur inclusively from From
// through Through.
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValuesAlongRangeOp{
from: from,
through: through,
})
op, _ := getValuesAlongRanges.Get()
op.from = from
op.through = through
ops = append(ops, op)
v.operations[*fingerprint] = ops
}
var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024)
// Gets value ranges at intervals for the given Fingerprint:
//
// |----| |----| |----| |----|
@ -90,13 +98,13 @@ func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint
// from interval rangeDuration through
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) {
ops := v.operations[*fingerprint]
ops = append(ops, &getValueRangeAtIntervalOp{
rangeFrom: from,
rangeThrough: from.Add(rangeDuration),
rangeDuration: rangeDuration,
interval: interval,
through: through,
})
op, _ := getValuesAtIntervalAlongRanges.Get()
op.rangeFrom = from
op.rangeThrough = from.Add(rangeDuration)
op.rangeDuration = rangeDuration
op.interval = interval
op.through = through
ops = append(ops, op)
v.operations[*fingerprint] = ops
}
@ -134,3 +142,18 @@ func (v view) appendSamples(fingerprint *clientmodel.Fingerprint, samples Values
func newView() view {
return view{NewMemorySeriesStorage(MemorySeriesOptions{})}
}
func giveBackOp(op interface{}) bool {
switch v := op.(type) {
case *getValuesAtTimeOp:
return getValuesAtTimes.Give(v)
case *getValuesAtIntervalOp:
return getValuesAtIntervals.Give(v)
case *getValuesAlongRangeOp:
return getValuesAlongRanges.Give(v)
case *getValueRangeAtIntervalOp:
return getValuesAtIntervalAlongRanges.Give(v)
default:
panic("unrecognized operation")
}
}

View file

@ -67,7 +67,7 @@ func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o
return t, ok, err
}
if !ok {
return t, ok, err
return t, ok, nil
}
t = time.Unix(v.GetTimestamp(), 0)
return t, true, nil

View file

@ -38,7 +38,7 @@ type Persistence interface {
// Close reaps all of the underlying system resources associated with this
// persistence.
Close()
Close() error
// Has informs the user whether a given key exists in the database.
Has(key proto.Message) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if

View file

@ -18,8 +18,6 @@ import (
"code.google.com/p/goprotobuf/proto"
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
)
type batch struct {
@ -35,13 +33,34 @@ func NewBatch() *batch {
}
func (b *batch) Drop(key proto.Message) {
b.batch.Delete(coding.NewPBEncoder(key).MustEncode())
buf, _ := buffers.Get()
defer buffers.Give(buf)
if err := buf.Marshal(key); err != nil {
panic(err)
}
b.batch.Delete(buf.Bytes())
b.drops++
}
func (b *batch) Put(key, value proto.Message) {
b.batch.Put(coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
keyBuf, _ := buffers.Get()
defer buffers.Give(keyBuf)
if err := keyBuf.Marshal(key); err != nil {
panic(err)
}
valBuf, _ := buffers.Get()
defer buffers.Give(valBuf)
if err := valBuf.Marshal(value); err != nil {
panic(err)
}
b.batch.Put(keyBuf.Bytes(), valBuf.Bytes())
b.puts++

View file

@ -0,0 +1,46 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"github.com/prometheus/prometheus/utility"
"code.google.com/p/goprotobuf/proto"
)
var buffers = newBufferList(50)
type bufferList struct {
l utility.FreeList
}
func (l *bufferList) Get() (*proto.Buffer, bool) {
if v, ok := l.l.Get(); ok {
return v.(*proto.Buffer), ok
}
return proto.NewBuffer(make([]byte, 0, 4096)), false
}
func (l *bufferList) Give(v *proto.Buffer) bool {
v.Reset()
return l.l.Give(v)
}
func newBufferList(cap int) *bufferList {
return &bufferList{
l: utility.NewFreeList(cap),
}
}

View file

@ -13,30 +13,29 @@
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
)
// TODO: Evaluate whether to use coding.Encoder for the key and values instead
// raw bytes for consistency reasons.
// Iterator wraps Levigo and LevelDB's iterator behaviors in a manner that is
// conducive to IO-free testing.
//
// It borrows some of the operational assumptions from goskiplist, which
// functions very similarly, in that it uses no separate Valid method to
// determine health. All methods that have a return signature of (ok bool)
// assume in the real LevelDB case that if ok == false that the iterator
// must be disposed of at this given instance and recreated if future
// work is desired. This is a quirk of LevelDB itself!
type Iterator interface {
// GetError reports low-level errors, if available. This should not indicate
// that the iterator is necessarily unhealthy but maybe that the underlying
// table is corrupted itself. See the notes above for (ok bool) return
// signatures to determine iterator health.
GetError() error
Key() []byte
Next() (ok bool)
Previous() (ok bool)
Seek(key []byte) (ok bool)
SeekToFirst() (ok bool)
SeekToLast() (ok bool)
Value() []byte
Close()
Error() error
Valid() bool
SeekToFirst() bool
SeekToLast() bool
Seek(proto.Message) bool
Next() bool
Previous() bool
Key(proto.Message) error
Value(proto.Message) error
Close() error
rawKey() []byte
rawValue() []byte
}

View file

@ -20,7 +20,6 @@ import (
"code.google.com/p/goprotobuf/proto"
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
)
@ -84,9 +83,9 @@ func (i levigoIterator) String() string {
return fmt.Sprintf("levigoIterator created at %s that is %s and %s and %s", i.creationTime, open, valid, snapshotted)
}
func (i *levigoIterator) Close() {
func (i *levigoIterator) Close() error {
if i.closed {
return
return nil
}
if i.iterator != nil {
@ -108,11 +107,18 @@ func (i *levigoIterator) Close() {
i.closed = true
i.valid = false
return
return nil
}
func (i *levigoIterator) Seek(key []byte) bool {
i.iterator.Seek(key)
func (i *levigoIterator) Seek(m proto.Message) bool {
buf, _ := buffers.Get()
defer buffers.Give(buf)
if err := buf.Marshal(m); err != nil {
panic(err)
}
i.iterator.Seek(buf.Bytes())
i.valid = i.iterator.Valid()
@ -151,18 +157,40 @@ func (i *levigoIterator) Previous() bool {
return i.valid
}
func (i levigoIterator) Key() (key []byte) {
func (i *levigoIterator) rawKey() (key []byte) {
return i.iterator.Key()
}
func (i levigoIterator) Value() (value []byte) {
func (i *levigoIterator) rawValue() (value []byte) {
return i.iterator.Value()
}
func (i levigoIterator) GetError() (err error) {
func (i *levigoIterator) Error() (err error) {
return i.iterator.GetError()
}
func (i *levigoIterator) Key(m proto.Message) error {
buf, _ := buffers.Get()
defer buffers.Give(buf)
buf.SetBuf(i.iterator.Key())
return buf.Unmarshal(m)
}
func (i *levigoIterator) Value(m proto.Message) error {
buf, _ := buffers.Get()
defer buffers.Give(buf)
buf.SetBuf(i.iterator.Value())
return buf.Unmarshal(m)
}
func (i *levigoIterator) Valid() bool {
return i.valid
}
type Compression uint
const (
@ -229,7 +257,7 @@ func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) {
}, nil
}
func (l *LevelDBPersistence) Close() {
func (l *LevelDBPersistence) Close() error {
// These are deferred to take advantage of forced closing in case of stack
// unwinding due to anomalies.
defer func() {
@ -268,11 +296,18 @@ func (l *LevelDBPersistence) Close() {
}
}()
return
return nil
}
func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
raw, err := l.storage.Get(l.readOptions, coding.NewPBEncoder(k).MustEncode())
buf, _ := buffers.Get()
defer buffers.Give(buf)
if err := buf.Marshal(k); err != nil {
panic(err)
}
raw, err := l.storage.Get(l.readOptions, buf.Bytes())
if err != nil {
return false, err
}
@ -284,8 +319,9 @@ func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
return true, nil
}
err = proto.Unmarshal(raw, v)
if err != nil {
buf.SetBuf(raw)
if err := buf.Unmarshal(v); err != nil {
return true, err
}
@ -297,11 +333,32 @@ func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) {
}
func (l *LevelDBPersistence) Drop(k proto.Message) error {
return l.storage.Delete(l.writeOptions, coding.NewPBEncoder(k).MustEncode())
buf, _ := buffers.Get()
defer buffers.Give(buf)
if err := buf.Marshal(k); err != nil {
panic(err)
}
return l.storage.Delete(l.writeOptions, buf.Bytes())
}
func (l *LevelDBPersistence) Put(key, value proto.Message) error {
return l.storage.Put(l.writeOptions, coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
keyBuf, _ := buffers.Get()
defer buffers.Give(keyBuf)
if err := keyBuf.Marshal(key); err != nil {
panic(err)
}
valBuf, _ := buffers.Get()
defer buffers.Give(valBuf)
if err := valBuf.Marshal(value); err != nil {
panic(err)
}
return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes())
}
func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
@ -333,7 +390,10 @@ func (l *LevelDBPersistence) Prune() {
}
func (l *LevelDBPersistence) Size() (uint64, error) {
iterator := l.NewIterator(false)
iterator, err := l.NewIterator(false)
if err != nil {
return 0, err
}
defer iterator.Close()
if !iterator.SeekToFirst() {
@ -342,13 +402,13 @@ func (l *LevelDBPersistence) Size() (uint64, error) {
keyspace := levigo.Range{}
keyspace.Start = iterator.Key()
keyspace.Start = iterator.rawKey()
if !iterator.SeekToLast() {
return 0, fmt.Errorf("could not seek to last key")
}
keyspace.Limit = iterator.Key()
keyspace.Limit = iterator.rawKey()
sizes := l.storage.GetApproximateSizes([]levigo.Range{keyspace})
total := uint64(0)
@ -374,7 +434,7 @@ func (l *LevelDBPersistence) Size() (uint64, error) {
// will be leaked.
//
// The iterator is optionally snapshotable.
func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator {
func (l *LevelDBPersistence) NewIterator(snapshotted bool) (Iterator, error) {
var (
snapshot *levigo.Snapshot
readOptions *levigo.ReadOptions
@ -396,27 +456,27 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator {
readOptions: readOptions,
snapshot: snapshot,
storage: l.storage,
}
}, nil
}
func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
var (
iterator = l.NewIterator(true)
valid bool
)
iterator, err := l.NewIterator(true)
if err != nil {
return false, err
}
defer iterator.Close()
for valid = iterator.SeekToFirst(); valid; valid = iterator.Next() {
err = iterator.GetError()
if err != nil {
return
for valid := iterator.SeekToFirst(); valid; valid = iterator.Next() {
if err = iterator.Error(); err != nil {
return false, err
}
decodedKey, decodeErr := decoder.DecodeKey(iterator.Key())
decodedKey, decodeErr := decoder.DecodeKey(iterator.rawKey())
if decodeErr != nil {
continue
}
decodedValue, decodeErr := decoder.DecodeValue(iterator.Value())
decodedValue, decodeErr := decoder.DecodeValue(iterator.rawValue())
if decodeErr != nil {
continue
}
@ -436,6 +496,5 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora
}
}
}
scannedEntireCorpus = true
return
return true, nil
}

45
utility/freelist.go Normal file
View file

@ -0,0 +1,45 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utility
type FreeList chan interface{}
func NewFreeList(cap int) FreeList {
return make(FreeList, cap)
}
func (l FreeList) Get() (interface{}, bool) {
select {
case v := <-l:
return v, true
default:
return nil, false
}
}
func (l FreeList) Give(v interface{}) bool {
select {
case l <- v:
return true
default:
return false
}
}
func (l FreeList) Close() {
close(l)
for _ = range l {
}
}

View file

@ -21,6 +21,9 @@ import (
"net/http"
"net/http/pprof"
"os"
"time"
pprof_runtime "runtime/pprof"
"code.google.com/p/gorest"
"github.com/golang/glog"
@ -63,6 +66,7 @@ func (w WebService) ServeForever() error {
exp.Handle("/databases", w.DatabasesHandler)
exp.Handle("/alerts", w.AlertsHandler)
exp.HandleFunc("/graph", graphHandler)
exp.HandleFunc("/heap", dumpHeap)
exp.Handle("/api/", compressionHandler{handler: gorest.Handle()})
exp.Handle("/metrics", prometheus.DefaultHandler)
@ -139,6 +143,18 @@ func executeTemplate(w http.ResponseWriter, name string, data interface{}) {
}
}
func dumpHeap(w http.ResponseWriter, r *http.Request) {
target := fmt.Sprintf("/tmp/%d.heap", time.Now().Unix())
f, err := os.Create(target)
if err != nil {
glog.Error("Could not dump heap: ", err)
}
fmt.Fprintf(w, "Writing to %s...", target)
defer f.Close()
pprof_runtime.WriteHeapProfile(f)
fmt.Fprintf(w, "Done")
}
func MustBuildServerUrl() string {
_, port, err := net.SplitHostPort(*listenAddress)
if err != nil {