diff --git a/main.go b/main.go index bd8779c54..22aee64da 100644 --- a/main.go +++ b/main.go @@ -80,7 +80,7 @@ type prometheus struct { tailCompactionTimer *time.Ticker deletionTimer *time.Ticker - curationMutex sync.Mutex + curationSema chan bool stopBackgroundOperations chan bool unwrittenSamples chan *extraction.Result @@ -104,41 +104,62 @@ func (p *prometheus) interruptHandler() { } func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { - p.curationMutex.Lock() - defer p.curationMutex.Unlock() - - processor := &metric.CompactionProcessor{ - MaximumMutationPoolBatch: groupSize * 3, - MinimumGroupSize: groupSize, + select { + case p.curationSema <- true: + default: + glog.Warningf("Deferred compaction for %s and %s due to existing operation.", operation, groupSize) + return } - curator := metric.Curator{ + defer func() { + <-p.curationSema + }() + + processor := metric.NewCompactionProcessor(&metric.CompactionProcessorOptions{ + MaximumMutationPoolBatch: groupSize * 3, + MinimumGroupSize: groupSize, + }) + defer processor.Close() + + curator := metric.NewCurator(&metric.CuratorOptions{ Stop: p.stopBackgroundOperations, ViewQueue: p.storage.ViewQueue, - } + }) + defer curator.Close() return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) } func (p *prometheus) delete(olderThan time.Duration, batchSize int) error { - p.curationMutex.Lock() - defer p.curationMutex.Unlock() - - processor := &metric.DeletionProcessor{ - MaximumMutationPoolBatch: batchSize, + select { + case p.curationSema <- true: + default: + glog.Warningf("Deferred compaction for %s and %s due to existing operation.", operation, groupSize) + return } - curator := metric.Curator{ + processor := metric.NewDeletionProcessor(&metric.DeletionProcessorOptions{ + MaximumMutationPoolBatch: batchSize, + }) + defer processor.Close() + + curator := metric.NewCurator(&metric.CuratorOptions{ Stop: p.stopBackgroundOperations, ViewQueue: p.storage.ViewQueue, - } + }) + defer curator.Close() return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) } func (p *prometheus) close() { + select { + case p.curationSema <- true: + default: + } + if p.headCompactionTimer != nil { p.headCompactionTimer.Stop() } @@ -156,8 +177,6 @@ func (p *prometheus) close() { p.stopBackgroundOperations <- true } - p.curationMutex.Lock() - p.ruleManager.Stop() p.storage.Close() @@ -267,6 +286,7 @@ func main() { deletionTimer: deletionTimer, curationState: prometheusStatus, + curationSema: make(chan bool, 1), unwrittenSamples: unwrittenSamples, diff --git a/storage/metric/curator.go b/storage/metric/curator.go index d8f8ef5ea..c8daf8ebf 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -15,6 +15,7 @@ package metric import ( "bytes" + "errors" "fmt" "strings" "time" @@ -24,7 +25,6 @@ import ( clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" @@ -34,6 +34,8 @@ import ( const curationYieldPeriod = 250 * time.Millisecond +var errIllegalIterator = errors.New("Iterator invalid.") + // CurationStateUpdater receives updates about the curation state. type CurationStateUpdater interface { UpdateCurationState(*CurationState) @@ -48,10 +50,7 @@ type CurationState struct { Fingerprint *clientmodel.Fingerprint } -// curator is responsible for effectuating a given curation policy across the -// stored samples on-disk. This is useful to compact sparse sample values into -// single sample entities to reduce keyspace load on the datastore. -type Curator struct { +type CuratorOptions struct { // Stop functions as a channel that when empty allows the curator to operate. // The moment a value is ingested inside of it, the curator goes into drain // mode. @@ -60,6 +59,29 @@ type Curator struct { ViewQueue chan viewJob } +// curator is responsible for effectuating a given curation policy across the +// stored samples on-disk. This is useful to compact sparse sample values into +// single sample entities to reduce keyspace load on the datastore. +type Curator struct { + stop chan bool + + viewQueue chan viewJob + + dtoSampleKeys *dtoSampleKeyList + sampleKeys *sampleKeyList +} + +func NewCurator(o *CuratorOptions) *Curator { + return &Curator{ + stop: o.Stop, + + viewQueue: o.ViewQueue, + + dtoSampleKeys: newDtoSampleKeyList(10), + sampleKeys: newSampleKeyList(10), + } +} + // watermarkScanner converts (dto.Fingerprint, dto.MetricHighWatermark) doubles // into (model.Fingerprint, model.Watermark) doubles. // @@ -95,6 +117,9 @@ type watermarkScanner struct { firstBlock, lastBlock *SampleKey ViewQueue chan viewJob + + dtoSampleKeys *dtoSampleKeyList + sampleKeys *sampleKeyList } // run facilitates the curation lifecycle. @@ -122,7 +147,10 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces defer status.UpdateCurationState(&CurationState{Active: false}) - iterator := samples.NewIterator(true) + iterator, err := samples.NewIterator(true) + if err != nil { + return err + } defer iterator.Close() if !iterator.SeekToLast() { @@ -130,21 +158,40 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces return } - lastBlock, _ := extractSampleKey(iterator) + + keyDto, _ := c.dtoSampleKeys.Get() + defer c.dtoSampleKeys.Give(keyDto) + + lastBlock, _ := c.sampleKeys.Get() + defer c.sampleKeys.Give(lastBlock) + + if err := iterator.Key(keyDto); err != nil { + panic(err) + } + + lastBlock.Load(keyDto) if !iterator.SeekToFirst() { glog.Info("Empty database; skipping curation.") return } - firstBlock, _ := extractSampleKey(iterator) + + firstBlock, _ := c.sampleKeys.Get() + defer c.sampleKeys.Give(firstBlock) + + if err := iterator.Key(keyDto); err != nil { + panic(err) + } + + firstBlock.Load(keyDto) scanner := &watermarkScanner{ curationState: curationState, ignoreYoungerThan: ignoreYoungerThan, processor: processor, status: status, - stop: c.Stop, + stop: c.stop, stopAt: instant.Add(-1 * ignoreYoungerThan), sampleIterator: iterator, @@ -153,7 +200,10 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces firstBlock: firstBlock, lastBlock: lastBlock, - ViewQueue: c.ViewQueue, + ViewQueue: c.viewQueue, + + dtoSampleKeys: c.dtoSampleKeys, + sampleKeys: c.sampleKeys, } // Right now, the ability to stop a curation is limited to the beginning of @@ -167,11 +217,16 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces // drain instructs the curator to stop at the next convenient moment as to not // introduce data inconsistencies. func (c *Curator) Drain() { - if len(c.Stop) == 0 { - c.Stop <- true + if len(c.stop) == 0 { + c.stop <- true } } +func (c *Curator) Close() { + c.dtoSampleKeys.Close() + c.sampleKeys.Close() +} + func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) { key := new(dto.Fingerprint) bytes := in.([]byte) @@ -284,26 +339,32 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm } func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) { + fingerprint := key.(*clientmodel.Fingerprint) + + glog.Infof("Curating %s...", fingerprint) + if len(w.ViewQueue) > 0 { + glog.Warning("Deferred due to view queue.") time.Sleep(curationYieldPeriod) } - fingerprint := key.(*clientmodel.Fingerprint) - if fingerprint.Less(w.firstBlock.Fingerprint) { + glog.Warning("Skipped since before keyspace.") return nil } if w.lastBlock.Fingerprint.Less(fingerprint) { + glog.Warning("Skipped since after keyspace.") return nil } - curationState, present, err := w.curationState.Get(&curationKey{ + curationState, _, err := w.curationState.Get(&curationKey{ Fingerprint: fingerprint, ProcessorMessageRaw: w.processor.Signature(), ProcessorMessageTypeName: w.processor.Name(), IgnoreYoungerThan: w.ignoreYoungerThan, }) if err != nil { + glog.Warning("Unable to get curation state: %s", err) // An anomaly with the curation remark is likely not fatal in the sense that // there was a decoding error with the entity and shouldn't be cause to stop // work. The process will simply start from a pessimistic work time and @@ -311,47 +372,45 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr return &storage.OperatorError{error: err, Continuable: true} } - keySet := &SampleKey{ - Fingerprint: fingerprint, + keySet, _ := w.sampleKeys.Get() + defer w.sampleKeys.Give(keySet) + + keySet.Fingerprint = fingerprint + keySet.FirstTimestamp = curationState + + // Invariant: The fingerprint tests above ensure that we have the same + // fingerprint. + keySet.Constrain(w.firstBlock, w.lastBlock) + + seeker := &iteratorSeekerState{ + i: w.sampleIterator, + + obj: keySet, + + first: w.firstBlock, + last: w.lastBlock, + + dtoSampleKeys: w.dtoSampleKeys, + sampleKeys: w.sampleKeys, } - if !present && fingerprint.Equal(w.firstBlock.Fingerprint) { - // If the fingerprint is the same, then we simply need to use the earliest - // block found in the database. - *keySet = *w.firstBlock - } else if present { - keySet.FirstTimestamp = curationState + for state := seeker.initialize; state != nil; state = state() { } - dto := new(dto.SampleKey) - keySet.Dump(dto) - prospectiveKey := coding.NewPBEncoder(dto).MustEncode() - if !w.sampleIterator.Seek(prospectiveKey) { - // LevelDB is picky about the seek ranges. If an iterator was invalidated, - // no work may occur, and the iterator cannot be recovered. - return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false} + if seeker.err != nil { + glog.Warningf("Got error in state machine: %s", seeker.err) + + return &storage.OperatorError{error: seeker.err, Continuable: !seeker.iteratorInvalid} } - for { - sampleKey, err := extractSampleKey(w.sampleIterator) - if err != nil { - return - } - if !sampleKey.Fingerprint.Equal(fingerprint) { - return - } + if seeker.iteratorInvalid { + glog.Warningf("Got illegal iterator in state machine: %s", err) - if !present { - break - } + return &storage.OperatorError{error: errIllegalIterator, Continuable: false} + } - if !(sampleKey.FirstTimestamp.Before(curationState) && sampleKey.LastTimestamp.Before(curationState)) { - break - } - - if !w.sampleIterator.Next() { - return - } + if !seeker.seriesOperable { + return } lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint) diff --git a/storage/metric/freelist.go b/storage/metric/freelist.go new file mode 100644 index 000000000..0ad318364 --- /dev/null +++ b/storage/metric/freelist.go @@ -0,0 +1,182 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "github.com/prometheus/prometheus/utility" + + dto "github.com/prometheus/prometheus/model/generated" +) + +type dtoSampleKeyList struct { + l utility.FreeList +} + +func newDtoSampleKeyList(cap int) *dtoSampleKeyList { + return &dtoSampleKeyList{ + l: utility.NewFreeList(cap), + } +} + +func (l *dtoSampleKeyList) Get() (*dto.SampleKey, bool) { + if v, ok := l.l.Get(); ok { + return v.(*dto.SampleKey), ok + } + + return new(dto.SampleKey), false +} + +func (l *dtoSampleKeyList) Give(v *dto.SampleKey) bool { + v.Reset() + + return l.l.Give(v) +} + +func (l *dtoSampleKeyList) Close() { + l.l.Close() +} + +type sampleKeyList struct { + l utility.FreeList +} + +var defaultSampleKey = new(SampleKey) + +func newSampleKeyList(cap int) *sampleKeyList { + return &sampleKeyList{ + l: utility.NewFreeList(cap), + } +} + +func (l *sampleKeyList) Get() (*SampleKey, bool) { + if v, ok := l.l.Get(); ok { + return v.(*SampleKey), ok + } + + return new(SampleKey), false +} + +func (l *sampleKeyList) Give(v *SampleKey) bool { + *v = *defaultSampleKey + + return l.l.Give(v) +} + +func (l *sampleKeyList) Close() { + l.l.Close() +} + +type valueAtTimeList struct { + l utility.FreeList +} + +func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) { + if v, ok := l.l.Get(); ok { + return v.(*getValuesAtTimeOp), ok + } + + return new(getValuesAtTimeOp), false +} + +var pGetValuesAtTimeOp = new(getValuesAtTimeOp) + +func (l *valueAtTimeList) Give(v *getValuesAtTimeOp) bool { + *v = *pGetValuesAtTimeOp + + return l.l.Give(v) +} + +func newValueAtTimeList(cap int) *valueAtTimeList { + return &valueAtTimeList{ + l: utility.NewFreeList(cap), + } +} + +type valueAtIntervalList struct { + l utility.FreeList +} + +func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) { + if v, ok := l.l.Get(); ok { + return v.(*getValuesAtIntervalOp), ok + } + + return new(getValuesAtIntervalOp), false +} + +var pGetValuesAtIntervalOp = new(getValuesAtIntervalOp) + +func (l *valueAtIntervalList) Give(v *getValuesAtIntervalOp) bool { + *v = *pGetValuesAtIntervalOp + + return l.l.Give(v) +} + +func newValueAtIntervalList(cap int) *valueAtIntervalList { + return &valueAtIntervalList{ + l: utility.NewFreeList(cap), + } +} + +type valueAlongRangeList struct { + l utility.FreeList +} + +func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) { + if v, ok := l.l.Get(); ok { + return v.(*getValuesAlongRangeOp), ok + } + + return new(getValuesAlongRangeOp), false +} + +var pGetValuesAlongRangeOp = new(getValuesAlongRangeOp) + +func (l *valueAlongRangeList) Give(v *getValuesAlongRangeOp) bool { + *v = *pGetValuesAlongRangeOp + + return l.l.Give(v) +} + +func newValueAlongRangeList(cap int) *valueAlongRangeList { + return &valueAlongRangeList{ + l: utility.NewFreeList(cap), + } +} + +type valueAtIntervalAlongRangeList struct { + l utility.FreeList +} + +func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool) { + if v, ok := l.l.Get(); ok { + return v.(*getValueRangeAtIntervalOp), ok + } + + return new(getValueRangeAtIntervalOp), false +} + +var pGetValueRangeAtIntervalOp = new(getValueRangeAtIntervalOp) + +func (l *valueAtIntervalAlongRangeList) Give(v *getValueRangeAtIntervalOp) bool { + *v = *pGetValueRangeAtIntervalOp + + return l.l.Give(v) +} + +func newValueAtIntervalAlongRangeList(cap int) *valueAtIntervalAlongRangeList { + return &valueAtIntervalAlongRangeList{ + l: utility.NewFreeList(cap), + } +} diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index b5b0dee11..c3e7bbb64 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -402,8 +402,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) { k := &dto.SampleKey{} - err := proto.Unmarshal(i.Key(), k) - if err != nil { + if err := i.Key(k); err != nil { return nil, err } @@ -415,8 +414,7 @@ func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) { func extractSampleValues(i leveldb.Iterator) (Values, error) { v := &dto.SampleValueSeries{} - err := proto.Unmarshal(i.Value(), v) - if err != nil { + if err := i.Value(v); err != nil { return nil, err } diff --git a/storage/metric/objective.go b/storage/metric/objective.go new file mode 100644 index 000000000..657d5a8f1 --- /dev/null +++ b/storage/metric/objective.go @@ -0,0 +1,212 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "fmt" + + "github.com/golang/glog" + + "github.com/prometheus/prometheus/storage/raw/leveldb" + + dto "github.com/prometheus/prometheus/model/generated" +) + +type iteratorSeekerState struct { + // Immutable State + i leveldb.Iterator + + obj *SampleKey + + first, last *SampleKey + + dtoSampleKeys *dtoSampleKeyList + sampleKeys *sampleKeyList + + // Mutable State + iteratorInvalid bool + seriesOperable bool + err error + + key *SampleKey + keyDto *dto.SampleKey +} + +// iteratorSeeker is a function that models a state machine state and +// is responsible for choosing the subsequent state given the present +// disposition. +// +// It returns the next state or nil if no remaining transition is possible. +// It returns an error if one occurred and finally a truth value indicating +// whether the current iterator state is usable and whether it can proceed with +// the current fingerprint. +type iteratorSeeker func() iteratorSeeker + +func (s *iteratorSeekerState) initialize() iteratorSeeker { + s.key, _ = s.sampleKeys.Get() + s.keyDto, _ = s.dtoSampleKeys.Get() + + return s.start +} + +func (s *iteratorSeekerState) destroy() iteratorSeeker { + s.sampleKeys.Give(s.key) + s.dtoSampleKeys.Give(s.keyDto) + + return nil +} + +func (s *iteratorSeekerState) start() iteratorSeeker { + switch { + case s.obj.Fingerprint.Less(s.first.Fingerprint): + // The fingerprint does not exist in the database. + return s.destroy + + case s.last.Fingerprint.Less(s.obj.Fingerprint): + // The fingerprint does not exist in the database. + return s.destroy + + case s.obj.Fingerprint.Equal(s.first.Fingerprint) && s.obj.FirstTimestamp.Before(s.first.FirstTimestamp): + // The fingerprint is the first fingerprint, but we've requested a value + // before what exists in the database. + return s.seekBeginning + + case s.last.Before(s.obj.Fingerprint, s.obj.FirstTimestamp): + // The requested time for work is after the last sample in the database; we + // can't do anything! + return s.destroy + + default: + return s.initialSeek + } +} + +func (s *iteratorSeekerState) seekBeginning() iteratorSeeker { + s.i.SeekToFirst() + if !s.i.Valid() { + s.err = s.i.Error() + // If we can't seek to the beginning, there isn't any hope for us. + glog.Warning("iterator went bad: %s", s.err) + s.iteratorInvalid = true + return s.destroy + } + + return s.initialMatchFingerprint +} + +func (s *iteratorSeekerState) initialSeek() iteratorSeeker { + s.obj.Dump(s.keyDto) + + s.i.Seek(s.keyDto) + if !s.i.Valid() { + s.err = s.i.Error() + glog.Warningf("iterator went bad %s", s.err) + s.iteratorInvalid = true + return s.destroy + } + + return s.initialMatchFingerprint +} + +func (s *iteratorSeekerState) initialMatchFingerprint() iteratorSeeker { + if err := s.i.Key(s.keyDto); err != nil { + s.err = err + return s.destroy + } + + s.key.Load(s.keyDto) + + switch { + case s.obj.Fingerprint.Less(s.key.Fingerprint): + return s.initialFingerprintOvershot + + case s.key.Fingerprint.Less(s.obj.Fingerprint): + panic("violated invariant") + + default: + return s.initialMatchTime + } +} + +func (s *iteratorSeekerState) initialFingerprintOvershot() iteratorSeeker { + s.i.Previous() + if !s.i.Valid() { + glog.Warningf("Could not backtrack for %s", s) + panic("violated invariant") + } + + if err := s.i.Key(s.keyDto); err != nil { + s.err = err + return s.destroy + } + + s.key.Load(s.keyDto) + + if !s.key.Fingerprint.Equal(s.obj.Fingerprint) { + return s.destroy + } + + return s.initialMatchTime +} + +func (s *iteratorSeekerState) initialMatchTime() iteratorSeeker { + switch { + case s.key.MayContain(s.obj.FirstTimestamp): + s.seriesOperable = true + return s.destroy + + case s.key.Equal(s.first), s.obj.FirstTimestamp.Equal(s.key.FirstTimestamp): + s.seriesOperable = true + return s.destroy + + case s.obj.FirstTimestamp.Before(s.key.FirstTimestamp): + return s.reCue + + default: + panic("violated invariant " + fmt.Sprintln(s.obj, s.key)) + } +} + +func (s *iteratorSeekerState) reCue() iteratorSeeker { + s.i.Previous() + if !s.i.Valid() { + glog.Warningf("Could not backtrack for %s", s) + panic("violated invariant") + } + + if err := s.i.Key(s.keyDto); err != nil { + s.err = err + return s.destroy + } + + s.key.Load(s.keyDto) + + if !s.key.Fingerprint.Equal(s.obj.Fingerprint) { + return s.fastForward + } + + s.seriesOperable = true + return s.destroy +} + +func (s *iteratorSeekerState) fastForward() iteratorSeeker { + s.i.Next() + if !s.i.Valid() { + glog.Warningf("Could not fast-forward for %s", s) + panic("violated invariant") + } + + s.seriesOperable = true + return s.destroy +} diff --git a/storage/metric/processor.go b/storage/metric/processor.go index 831032555..b1abc606b 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -50,18 +50,14 @@ type Processor interface { // CompactionProcessor combines sparse values in the database together such // that at least MinimumGroupSize-sized chunks are grouped together. type CompactionProcessor struct { - // MaximumMutationPoolBatch represents approximately the largest pending - // batch of mutation operations for the database before pausing to - // commit before resumption. - // - // A reasonable value would be (MinimumGroupSize * 2) + 1. - MaximumMutationPoolBatch int - // MinimumGroupSize represents the smallest allowed sample chunk size in the - // database. - MinimumGroupSize int + maximumMutationPoolBatch int + minimumGroupSize int // signature is the byte representation of the CompactionProcessor's settings, // used for purely memoization purposes across an instance. signature []byte + + dtoSampleKeys *dtoSampleKeyList + sampleKeys *sampleKeyList } func (p *CompactionProcessor) Name() string { @@ -71,7 +67,7 @@ func (p *CompactionProcessor) Name() string { func (p *CompactionProcessor) Signature() []byte { if len(p.signature) == 0 { out, err := proto.Marshal(&dto.CompactionProcessorDefinition{ - MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)), + MinimumGroupSize: proto.Uint32(uint32(p.minimumGroupSize)), }) if err != nil { panic(err) @@ -84,7 +80,7 @@ func (p *CompactionProcessor) Signature() []byte { } func (p *CompactionProcessor) String() string { - return fmt.Sprintf("compactionProcessor for minimum group size %d", p.MinimumGroupSize) + return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize) } func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *clientmodel.Fingerprint) (lastCurated time.Time, err error) { @@ -98,16 +94,22 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers var pendingMutations = 0 var pendingSamples Values - var sampleKey *SampleKey var unactedSamples Values var lastTouchedTime time.Time var keyDropped bool - sampleKey, err = extractSampleKey(sampleIterator) - if err != nil { + sampleKey, _ := p.sampleKeys.Get() + defer p.sampleKeys.Give(sampleKey) + + sampleKeyDto, _ := p.dtoSampleKeys.Get() + defer p.dtoSampleKeys.Give(sampleKeyDto) + + if err = sampleIterator.Key(sampleKeyDto); err != nil { return } + sampleKey.Load(sampleKeyDto) + unactedSamples, err = extractSampleValues(sampleIterator) if err != nil { return @@ -129,10 +131,11 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers keyDropped = false - sampleKey, err = extractSampleKey(sampleIterator) - if err != nil { + if err = sampleIterator.Key(sampleKeyDto); err != nil { return } + sampleKey.Load(sampleKeyDto) + unactedSamples, err = extractSampleValues(sampleIterator) if err != nil { return @@ -141,7 +144,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers // If the number of pending mutations exceeds the allowed batch amount, // commit to disk and delete the batch. A new one will be recreated if // necessary. - case pendingMutations >= p.MaximumMutationPoolBatch: + case pendingMutations >= p.maximumMutationPoolBatch: err = samplesPersistence.Commit(pendingBatch) if err != nil { return @@ -152,11 +155,11 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers pendingBatch.Close() pendingBatch = nil - case len(pendingSamples) == 0 && len(unactedSamples) >= p.MinimumGroupSize: + case len(pendingSamples) == 0 && len(unactedSamples) >= p.minimumGroupSize: lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp unactedSamples = Values{} - case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize: + case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize: if !keyDropped { k := new(dto.SampleKey) sampleKey.Dump(k) @@ -170,7 +173,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers pendingMutations++ // If the number of pending writes equals the target group size - case len(pendingSamples) == p.MinimumGroupSize: + case len(pendingSamples) == p.minimumGroupSize: k := new(dto.SampleKey) newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey.Dump(k) @@ -187,9 +190,9 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers keyDropped = true } - if len(unactedSamples) > p.MinimumGroupSize { - pendingSamples = unactedSamples[:p.MinimumGroupSize] - unactedSamples = unactedSamples[p.MinimumGroupSize:] + if len(unactedSamples) > p.minimumGroupSize { + pendingSamples = unactedSamples[:p.minimumGroupSize] + unactedSamples = unactedSamples[p.minimumGroupSize:] lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp } else { pendingSamples = unactedSamples @@ -198,14 +201,14 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers } } - case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize: + case len(pendingSamples)+len(unactedSamples) >= p.minimumGroupSize: if !keyDropped { k := new(dto.SampleKey) sampleKey.Dump(k) pendingBatch.Drop(k) keyDropped = true } - remainder := p.MinimumGroupSize - len(pendingSamples) + remainder := p.minimumGroupSize - len(pendingSamples) pendingSamples = append(pendingSamples, unactedSamples[:remainder]...) unactedSamples = unactedSamples[remainder:] if len(unactedSamples) == 0 { @@ -244,15 +247,42 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers return } -// DeletionProcessor deletes sample blocks older than a defined value. -type DeletionProcessor struct { +func (p *CompactionProcessor) Close() { + p.dtoSampleKeys.Close() + p.sampleKeys.Close() +} + +type CompactionProcessorOptions struct { // MaximumMutationPoolBatch represents approximately the largest pending // batch of mutation operations for the database before pausing to // commit before resumption. + // + // A reasonable value would be (MinimumGroupSize * 2) + 1. MaximumMutationPoolBatch int + // MinimumGroupSize represents the smallest allowed sample chunk size in the + // database. + MinimumGroupSize int +} + +func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor { + return &CompactionProcessor{ + maximumMutationPoolBatch: o.MaximumMutationPoolBatch, + minimumGroupSize: o.MinimumGroupSize, + + dtoSampleKeys: newDtoSampleKeyList(10), + sampleKeys: newSampleKeyList(10), + } +} + +// DeletionProcessor deletes sample blocks older than a defined value. +type DeletionProcessor struct { + maximumMutationPoolBatch int // signature is the byte representation of the DeletionProcessor's settings, // used for purely memoization purposes across an instance. signature []byte + + dtoSampleKeys *dtoSampleKeyList + sampleKeys *sampleKeyList } func (p *DeletionProcessor) Name() string { @@ -286,10 +316,16 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis } }() - sampleKey, err := extractSampleKey(sampleIterator) - if err != nil { + sampleKeyDto, _ := p.dtoSampleKeys.Get() + defer p.dtoSampleKeys.Give(sampleKeyDto) + + sampleKey, _ := p.sampleKeys.Get() + defer p.sampleKeys.Give(sampleKey) + + if err = sampleIterator.Key(sampleKeyDto); err != nil { return } + sampleKey.Load(sampleKeyDto) sampleValues, err := extractSampleValues(sampleIterator) if err != nil { @@ -312,10 +348,11 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") } - sampleKey, err = extractSampleKey(sampleIterator) - if err != nil { + if err = sampleIterator.Key(sampleKeyDto); err != nil { return } + sampleKey.Load(sampleKeyDto) + sampleValues, err = extractSampleValues(sampleIterator) if err != nil { return @@ -324,7 +361,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis // If the number of pending mutations exceeds the allowed batch amount, // commit to disk and delete the batch. A new one will be recreated if // necessary. - case pendingMutations >= p.MaximumMutationPoolBatch: + case pendingMutations >= p.maximumMutationPoolBatch: err = samplesPersistence.Commit(pendingBatch) if err != nil { return @@ -379,3 +416,24 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis return } + +func (p *DeletionProcessor) Close() { + p.dtoSampleKeys.Close() + p.sampleKeys.Close() +} + +type DeletionProcessorOptions struct { + // MaximumMutationPoolBatch represents approximately the largest pending + // batch of mutation operations for the database before pausing to + // commit before resumption. + MaximumMutationPoolBatch int +} + +func NewDeletionProcessor(o *DeletionProcessorOptions) *DeletionProcessor { + return &DeletionProcessor{ + maximumMutationPoolBatch: o.MaximumMutationPoolBatch, + + dtoSampleKeys: newDtoSampleKeyList(10), + sampleKeys: newSampleKeyList(10), + } +} diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 1d1a1687a..706e1ec2a 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -123,10 +123,10 @@ func TestCuratorCompactionProcessor(t *testing.T) { }{ { in: in{ - processor: &CompactionProcessor{ + processor: NewCompactionProcessor(&CompactionProcessorOptions{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, - }, + }), ignoreYoungerThan: 1 * time.Hour, groupSize: 5, curationStates: fixture.Pairs{ @@ -134,27 +134,27 @@ func TestCuratorCompactionProcessor(t *testing.T) { fingerprint: "0001-A-1-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 30 * time.Minute), - processor: &CompactionProcessor{ + processor: NewCompactionProcessor(&CompactionProcessorOptions{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, - }, + }), }, curationState{ fingerprint: "0002-A-2-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 90 * time.Minute), - processor: &CompactionProcessor{ + processor: NewCompactionProcessor(&CompactionProcessorOptions{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, - }, + }), }, // This rule should effectively be ignored. curationState{ fingerprint: "0002-A-2-Z", - processor: &CompactionProcessor{ + processor: NewCompactionProcessor(&CompactionProcessorOptions{ MinimumGroupSize: 2, MaximumMutationPoolBatch: 15, - }, + }), ignoreYoungerThan: 30 * time.Minute, lastCurated: testInstant.Add(-1 * 90 * time.Minute), }, @@ -553,28 +553,28 @@ func TestCuratorCompactionProcessor(t *testing.T) { fingerprint: "0001-A-1-Z", ignoreYoungerThan: time.Hour, lastCurated: testInstant.Add(-1 * 30 * time.Minute), - processor: &CompactionProcessor{ + processor: NewCompactionProcessor(&CompactionProcessorOptions{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, - }, + }), }, { fingerprint: "0002-A-2-Z", ignoreYoungerThan: 30 * time.Minute, lastCurated: testInstant.Add(-1 * 90 * time.Minute), - processor: &CompactionProcessor{ + processor: NewCompactionProcessor(&CompactionProcessorOptions{ MinimumGroupSize: 2, MaximumMutationPoolBatch: 15, - }, + }), }, { fingerprint: "0002-A-2-Z", ignoreYoungerThan: time.Hour, lastCurated: testInstant.Add(-1 * 60 * time.Minute), - processor: &CompactionProcessor{ + processor: NewCompactionProcessor(&CompactionProcessorOptions{ MinimumGroupSize: 5, MaximumMutationPoolBatch: 15, - }, + }), }, }, sampleGroups: []sampleGroup{ @@ -881,16 +881,20 @@ func TestCuratorCompactionProcessor(t *testing.T) { stop := make(chan bool) defer close(stop) - c := Curator{ + c := NewCurator(&CuratorOptions{ Stop: stop, - } + }) + defer c.Close() err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates) if err != nil { t.Fatal(err) } - iterator := curatorStates.p.NewIterator(true) + iterator, err := curatorStates.p.NewIterator(true) + if err != nil { + t.Fatal(err) + } defer iterator.Close() for j, expected := range scenario.out.curationStates { @@ -905,9 +909,8 @@ func TestCuratorCompactionProcessor(t *testing.T) { } } - curationKeyDto := &dto.CurationKey{} - - err = proto.Unmarshal(iterator.Key(), curationKeyDto) + curationKeyDto := new(dto.CurationKey) + err = iterator.Key(curationKeyDto) if err != nil { t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } @@ -938,7 +941,10 @@ func TestCuratorCompactionProcessor(t *testing.T) { } } - iterator = samples.NewIterator(true) + iterator, err = samples.NewIterator(true) + if err != nil { + t.Fatal(err) + } defer iterator.Close() for j, expected := range scenario.out.sampleGroups { @@ -1004,9 +1010,9 @@ func TestCuratorDeletionProcessor(t *testing.T) { }{ { in: in{ - processor: &DeletionProcessor{ + processor: NewDeletionProcessor(&DeletionProcessorOptions{ MaximumMutationPoolBatch: 15, - }, + }), ignoreYoungerThan: 1 * time.Hour, groupSize: 5, curationStates: fixture.Pairs{ @@ -1014,17 +1020,17 @@ func TestCuratorDeletionProcessor(t *testing.T) { fingerprint: "0001-A-1-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 90 * time.Minute), - processor: &DeletionProcessor{ + processor: NewDeletionProcessor(&DeletionProcessorOptions{ MaximumMutationPoolBatch: 15, - }, + }), }, curationState{ fingerprint: "0002-A-2-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 90 * time.Minute), - processor: &DeletionProcessor{ + processor: NewDeletionProcessor(&DeletionProcessorOptions{ MaximumMutationPoolBatch: 15, - }, + }), }, }, watermarkStates: fixture.Pairs{ @@ -1317,17 +1323,17 @@ func TestCuratorDeletionProcessor(t *testing.T) { fingerprint: "0001-A-1-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 30 * time.Minute), - processor: &DeletionProcessor{ + processor: NewDeletionProcessor(&DeletionProcessorOptions{ MaximumMutationPoolBatch: 15, - }, + }), }, { fingerprint: "0002-A-2-Z", ignoreYoungerThan: 1 * time.Hour, lastCurated: testInstant.Add(-1 * 60 * time.Minute), - processor: &DeletionProcessor{ + processor: NewDeletionProcessor(&DeletionProcessorOptions{ MaximumMutationPoolBatch: 15, - }, + }), }, }, sampleGroups: []sampleGroup{ @@ -1404,16 +1410,20 @@ func TestCuratorDeletionProcessor(t *testing.T) { stop := make(chan bool) defer close(stop) - c := Curator{ + c := NewCurator(&CuratorOptions{ Stop: stop, - } + }) + defer c.Close() err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates) if err != nil { t.Fatal(err) } - iterator := curatorStates.p.NewIterator(true) + iterator, err := curatorStates.p.NewIterator(true) + if err != nil { + t.Fatal(err) + } defer iterator.Close() for j, expected := range scenario.out.curationStates { @@ -1429,9 +1439,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { } curationKeyDto := new(dto.CurationKey) - - err = proto.Unmarshal(iterator.Key(), curationKeyDto) - if err != nil { + if err := iterator.Key(curationKeyDto); err != nil { t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } @@ -1463,7 +1471,10 @@ func TestCuratorDeletionProcessor(t *testing.T) { } } - iterator = samples.NewIterator(true) + iterator, err = samples.NewIterator(true) + if err != nil { + t.Fatal(err) + } defer iterator.Close() for j, expected := range scenario.out.sampleGroups { diff --git a/storage/metric/samplekey.go b/storage/metric/samplekey.go index e76133fba..75d9cda99 100644 --- a/storage/metric/samplekey.go +++ b/storage/metric/samplekey.go @@ -35,6 +35,21 @@ type SampleKey struct { SampleCount uint32 } +// Constrain merges the underlying SampleKey to fit within the keyspace of +// the provided first and last keys and returns whether the key was modified. +func (s *SampleKey) Constrain(first, last *SampleKey) bool { + switch { + case s.Before(first.Fingerprint, first.FirstTimestamp): + *s = *first + return true + case last.Before(s.Fingerprint, s.FirstTimestamp): + *s = *last + return true + default: + return false + } +} + func (s *SampleKey) Equal(o *SampleKey) bool { if s == o { return true diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 372f4a0e4..dd019710b 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -24,7 +24,6 @@ import ( clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/utility/test" @@ -198,12 +197,13 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *clientmodel.Fingerpr Timestamp: indexable.EncodeTime(i.OldestInclusive), } - e := coding.NewPBEncoder(k).MustEncode() - - iterator := l.MetricSamples.NewIterator(true) + iterator, err := l.MetricSamples.NewIterator(true) + if err != nil { + panic(err) + } defer iterator.Close() - for valid := iterator.Seek(e); valid; valid = iterator.Next() { + for valid := iterator.Seek(k); valid; valid = iterator.Next() { retrievedKey, err := extractSampleKey(iterator) if err != nil { return samples, err diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 350e81cb8..0197fe96e 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -23,12 +23,9 @@ import ( clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/stats" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility" - - dto "github.com/prometheus/prometheus/model/generated" ) type chunk Values @@ -95,6 +92,9 @@ type TieredStorage struct { Indexer MetricIndexer flushSema chan bool + + dtoSampleKeys *dtoSampleKeyList + sampleKeys *sampleKeyList } // viewJob encapsulates a request to extract sample values from the datastore. @@ -140,6 +140,9 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn wmCache: wmCache, flushSema: make(chan bool, 1), + + dtoSampleKeys: newDtoSampleKeyList(10), + sampleKeys: newSampleKeyList(10), } for i := 0; i < tieredMemorySemaphores; i++ { @@ -323,6 +326,9 @@ func (t *TieredStorage) close() { close(t.ViewQueue) t.wmCache.Clear() + t.dtoSampleKeys.Close() + t.sampleKeys.Close() + t.state = tieredStorageStopping } @@ -379,7 +385,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) { var iterator leveldb.Iterator diskPresent := true - var firstBlock, lastBlock *SampleKey + + firstBlock, _ := t.sampleKeys.Get() + defer t.sampleKeys.Give(firstBlock) + + lastBlock, _ := t.sampleKeys.Get() + defer t.sampleKeys.Give(lastBlock) + + sampleKeyDto, _ := t.dtoSampleKeys.Get() + defer t.dtoSampleKeys.Give(sampleKeyDto) extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start() for _, scanJob := range scans { @@ -410,14 +424,23 @@ func (t *TieredStorage) renderView(viewJob viewJob) { if iterator == nil { // Get a single iterator that will be used for all data extraction // below. - iterator = t.DiskStorage.MetricSamples.NewIterator(true) + iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true) defer iterator.Close() if diskPresent = iterator.SeekToLast(); diskPresent { - lastBlock, _ = extractSampleKey(iterator) + if err := iterator.Key(sampleKeyDto); err != nil { + panic(err) + } + + lastBlock.Load(sampleKeyDto) + if !iterator.SeekToFirst() { diskPresent = false } else { - firstBlock, _ = extractSampleKey(iterator) + if err := iterator.Key(sampleKeyDto); err != nil { + panic(err) + } + + firstBlock.Load(sampleKeyDto) } } } @@ -482,7 +505,10 @@ func (t *TieredStorage) renderView(viewJob viewJob) { for _, op := range standingOps { if !op.Consumed() { filteredOps = append(filteredOps, op) + continue } + + giveBackOp(op) } standingOps = filteredOps @@ -515,9 +541,10 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri return nil, true } - seekingKey := &SampleKey{ - Fingerprint: fingerprint, - } + seekingKey, _ := t.sampleKeys.Get() + defer t.sampleKeys.Give(seekingKey) + + seekingKey.Fingerprint = fingerprint if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) { seekingKey.FirstTimestamp = firstBlock.FirstTimestamp @@ -527,21 +554,22 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri seekingKey.FirstTimestamp = ts } - fd := new(dto.SampleKey) - seekingKey.Dump(fd) + dto, _ := t.dtoSampleKeys.Get() + defer t.dtoSampleKeys.Give(dto) - // Try seeking to target key. - rawKey := coding.NewPBEncoder(fd).MustEncode() - if !iterator.Seek(rawKey) { + seekingKey.Dump(dto) + if !iterator.Seek(dto) { return chunk, true } - var foundKey *SampleKey var foundValues Values - foundKey, _ = extractSampleKey(iterator) + if err := iterator.Key(dto); err != nil { + panic(err) + } + seekingKey.Load(dto) - if foundKey.Fingerprint.Equal(fingerprint) { + if seekingKey.Fingerprint.Equal(fingerprint) { // Figure out if we need to rewind by one block. // Imagine the following supertime blocks with time ranges: // @@ -553,16 +581,19 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri // iterator seek behavior. // // Only do the rewind if there is another chunk before this one. - if !foundKey.MayContain(ts) { + if !seekingKey.MayContain(ts) { postValues, _ := extractSampleValues(iterator) - if !foundKey.Equal(firstBlock) { + if !seekingKey.Equal(firstBlock) { if !iterator.Previous() { panic("This should never return false.") } - foundKey, _ = extractSampleKey(iterator) + if err := iterator.Key(dto); err != nil { + panic(err) + } + seekingKey.Load(dto) - if !foundKey.Fingerprint.Equal(fingerprint) { + if !seekingKey.Fingerprint.Equal(fingerprint) { return postValues, false } @@ -576,15 +607,18 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri return foundValues, false } - if fingerprint.Less(foundKey.Fingerprint) { - if !foundKey.Equal(firstBlock) { + if fingerprint.Less(seekingKey.Fingerprint) { + if !seekingKey.Equal(firstBlock) { if !iterator.Previous() { panic("This should never return false.") } - foundKey, _ = extractSampleKey(iterator) + if err := iterator.Key(dto); err != nil { + panic(err) + } + seekingKey.Load(dto) - if !foundKey.Fingerprint.Equal(fingerprint) { + if !seekingKey.Fingerprint.Equal(fingerprint) { return nil, false } diff --git a/storage/metric/view.go b/storage/metric/view.go index f2e1193e3..d40c3e318 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -48,40 +48,48 @@ func NewViewRequestBuilder() *viewRequestBuilder { } } +var getValuesAtTimes = newValueAtTimeList(10 * 1024) + // Gets for the given Fingerprint either the value at that time if there is an // match or the one or two values adjacent thereto. func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) { ops := v.operations[*fingerprint] - ops = append(ops, &getValuesAtTimeOp{ - time: time, - }) + op, _ := getValuesAtTimes.Get() + op.time = time + ops = append(ops, op) v.operations[*fingerprint] = ops } +var getValuesAtIntervals = newValueAtIntervalList(10 * 1024) + // Gets for the given Fingerprint either the value at that interval from From // through Through if there is an match or the one or two values adjacent // for each point. func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) { ops := v.operations[*fingerprint] - ops = append(ops, &getValuesAtIntervalOp{ - from: from, - through: through, - interval: interval, - }) + op, _ := getValuesAtIntervals.Get() + op.from = from + op.through = through + op.interval = interval + ops = append(ops, op) v.operations[*fingerprint] = ops } +var getValuesAlongRanges = newValueAlongRangeList(10 * 1024) + // Gets for the given Fingerprint the values that occur inclusively from From // through Through. func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) { ops := v.operations[*fingerprint] - ops = append(ops, &getValuesAlongRangeOp{ - from: from, - through: through, - }) + op, _ := getValuesAlongRanges.Get() + op.from = from + op.through = through + ops = append(ops, op) v.operations[*fingerprint] = ops } +var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024) + // Gets value ranges at intervals for the given Fingerprint: // // |----| |----| |----| |----| @@ -90,13 +98,13 @@ func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint // from interval rangeDuration through func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) { ops := v.operations[*fingerprint] - ops = append(ops, &getValueRangeAtIntervalOp{ - rangeFrom: from, - rangeThrough: from.Add(rangeDuration), - rangeDuration: rangeDuration, - interval: interval, - through: through, - }) + op, _ := getValuesAtIntervalAlongRanges.Get() + op.rangeFrom = from + op.rangeThrough = from.Add(rangeDuration) + op.rangeDuration = rangeDuration + op.interval = interval + op.through = through + ops = append(ops, op) v.operations[*fingerprint] = ops } @@ -134,3 +142,18 @@ func (v view) appendSamples(fingerprint *clientmodel.Fingerprint, samples Values func newView() view { return view{NewMemorySeriesStorage(MemorySeriesOptions{})} } + +func giveBackOp(op interface{}) bool { + switch v := op.(type) { + case *getValuesAtTimeOp: + return getValuesAtTimes.Give(v) + case *getValuesAtIntervalOp: + return getValuesAtIntervals.Give(v) + case *getValuesAlongRangeOp: + return getValuesAlongRanges.Give(v) + case *getValueRangeAtIntervalOp: + return getValuesAtIntervalAlongRanges.Give(v) + default: + panic("unrecognized operation") + } +} diff --git a/storage/metric/watermark.go b/storage/metric/watermark.go index cf1fc69f3..88bfbd0b5 100644 --- a/storage/metric/watermark.go +++ b/storage/metric/watermark.go @@ -67,7 +67,7 @@ func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o return t, ok, err } if !ok { - return t, ok, err + return t, ok, nil } t = time.Unix(v.GetTimestamp(), 0) return t, true, nil diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 25f226803..1f8bbe952 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -38,7 +38,7 @@ type Persistence interface { // Close reaps all of the underlying system resources associated with this // persistence. - Close() + Close() error // Has informs the user whether a given key exists in the database. Has(key proto.Message) (bool, error) // Get retrieves the key from the database if it exists or returns nil if diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index ed5449788..87fe89f7f 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -18,8 +18,6 @@ import ( "code.google.com/p/goprotobuf/proto" "github.com/jmhodges/levigo" - - "github.com/prometheus/prometheus/coding" ) type batch struct { @@ -35,13 +33,34 @@ func NewBatch() *batch { } func (b *batch) Drop(key proto.Message) { - b.batch.Delete(coding.NewPBEncoder(key).MustEncode()) + buf, _ := buffers.Get() + defer buffers.Give(buf) + + if err := buf.Marshal(key); err != nil { + panic(err) + } + + b.batch.Delete(buf.Bytes()) b.drops++ } func (b *batch) Put(key, value proto.Message) { - b.batch.Put(coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode()) + keyBuf, _ := buffers.Get() + defer buffers.Give(keyBuf) + + if err := keyBuf.Marshal(key); err != nil { + panic(err) + } + + valBuf, _ := buffers.Get() + defer buffers.Give(valBuf) + + if err := valBuf.Marshal(value); err != nil { + panic(err) + } + + b.batch.Put(keyBuf.Bytes(), valBuf.Bytes()) b.puts++ diff --git a/storage/raw/leveldb/freelist.go b/storage/raw/leveldb/freelist.go new file mode 100644 index 000000000..5e27b1f86 --- /dev/null +++ b/storage/raw/leveldb/freelist.go @@ -0,0 +1,46 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "github.com/prometheus/prometheus/utility" + + "code.google.com/p/goprotobuf/proto" +) + +var buffers = newBufferList(50) + +type bufferList struct { + l utility.FreeList +} + +func (l *bufferList) Get() (*proto.Buffer, bool) { + if v, ok := l.l.Get(); ok { + return v.(*proto.Buffer), ok + } + + return proto.NewBuffer(make([]byte, 0, 4096)), false +} + +func (l *bufferList) Give(v *proto.Buffer) bool { + v.Reset() + + return l.l.Give(v) +} + +func newBufferList(cap int) *bufferList { + return &bufferList{ + l: utility.NewFreeList(cap), + } +} diff --git a/storage/raw/leveldb/iterator.go b/storage/raw/leveldb/iterator.go index b352eda16..9c9b25a22 100644 --- a/storage/raw/leveldb/iterator.go +++ b/storage/raw/leveldb/iterator.go @@ -13,30 +13,29 @@ package leveldb +import ( + "code.google.com/p/goprotobuf/proto" +) + // TODO: Evaluate whether to use coding.Encoder for the key and values instead // raw bytes for consistency reasons. -// Iterator wraps Levigo and LevelDB's iterator behaviors in a manner that is -// conducive to IO-free testing. -// -// It borrows some of the operational assumptions from goskiplist, which -// functions very similarly, in that it uses no separate Valid method to -// determine health. All methods that have a return signature of (ok bool) -// assume in the real LevelDB case that if ok == false that the iterator -// must be disposed of at this given instance and recreated if future -// work is desired. This is a quirk of LevelDB itself! type Iterator interface { - // GetError reports low-level errors, if available. This should not indicate - // that the iterator is necessarily unhealthy but maybe that the underlying - // table is corrupted itself. See the notes above for (ok bool) return - // signatures to determine iterator health. - GetError() error - Key() []byte - Next() (ok bool) - Previous() (ok bool) - Seek(key []byte) (ok bool) - SeekToFirst() (ok bool) - SeekToLast() (ok bool) - Value() []byte - Close() + Error() error + Valid() bool + + SeekToFirst() bool + SeekToLast() bool + Seek(proto.Message) bool + + Next() bool + Previous() bool + + Key(proto.Message) error + Value(proto.Message) error + + Close() error + + rawKey() []byte + rawValue() []byte } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 0bd25a54c..f206a4ab4 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -20,7 +20,6 @@ import ( "code.google.com/p/goprotobuf/proto" "github.com/jmhodges/levigo" - "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" ) @@ -84,9 +83,9 @@ func (i levigoIterator) String() string { return fmt.Sprintf("levigoIterator created at %s that is %s and %s and %s", i.creationTime, open, valid, snapshotted) } -func (i *levigoIterator) Close() { +func (i *levigoIterator) Close() error { if i.closed { - return + return nil } if i.iterator != nil { @@ -108,11 +107,18 @@ func (i *levigoIterator) Close() { i.closed = true i.valid = false - return + return nil } -func (i *levigoIterator) Seek(key []byte) bool { - i.iterator.Seek(key) +func (i *levigoIterator) Seek(m proto.Message) bool { + buf, _ := buffers.Get() + defer buffers.Give(buf) + + if err := buf.Marshal(m); err != nil { + panic(err) + } + + i.iterator.Seek(buf.Bytes()) i.valid = i.iterator.Valid() @@ -151,18 +157,40 @@ func (i *levigoIterator) Previous() bool { return i.valid } -func (i levigoIterator) Key() (key []byte) { +func (i *levigoIterator) rawKey() (key []byte) { return i.iterator.Key() } -func (i levigoIterator) Value() (value []byte) { +func (i *levigoIterator) rawValue() (value []byte) { return i.iterator.Value() } -func (i levigoIterator) GetError() (err error) { +func (i *levigoIterator) Error() (err error) { return i.iterator.GetError() } +func (i *levigoIterator) Key(m proto.Message) error { + buf, _ := buffers.Get() + defer buffers.Give(buf) + + buf.SetBuf(i.iterator.Key()) + + return buf.Unmarshal(m) +} + +func (i *levigoIterator) Value(m proto.Message) error { + buf, _ := buffers.Get() + defer buffers.Give(buf) + + buf.SetBuf(i.iterator.Value()) + + return buf.Unmarshal(m) +} + +func (i *levigoIterator) Valid() bool { + return i.valid +} + type Compression uint const ( @@ -229,7 +257,7 @@ func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) { }, nil } -func (l *LevelDBPersistence) Close() { +func (l *LevelDBPersistence) Close() error { // These are deferred to take advantage of forced closing in case of stack // unwinding due to anomalies. defer func() { @@ -268,11 +296,18 @@ func (l *LevelDBPersistence) Close() { } }() - return + return nil } func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) { - raw, err := l.storage.Get(l.readOptions, coding.NewPBEncoder(k).MustEncode()) + buf, _ := buffers.Get() + defer buffers.Give(buf) + + if err := buf.Marshal(k); err != nil { + panic(err) + } + + raw, err := l.storage.Get(l.readOptions, buf.Bytes()) if err != nil { return false, err } @@ -284,8 +319,9 @@ func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) { return true, nil } - err = proto.Unmarshal(raw, v) - if err != nil { + buf.SetBuf(raw) + + if err := buf.Unmarshal(v); err != nil { return true, err } @@ -297,11 +333,32 @@ func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) { } func (l *LevelDBPersistence) Drop(k proto.Message) error { - return l.storage.Delete(l.writeOptions, coding.NewPBEncoder(k).MustEncode()) + buf, _ := buffers.Get() + defer buffers.Give(buf) + + if err := buf.Marshal(k); err != nil { + panic(err) + } + + return l.storage.Delete(l.writeOptions, buf.Bytes()) } func (l *LevelDBPersistence) Put(key, value proto.Message) error { - return l.storage.Put(l.writeOptions, coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode()) + keyBuf, _ := buffers.Get() + defer buffers.Give(keyBuf) + + if err := keyBuf.Marshal(key); err != nil { + panic(err) + } + + valBuf, _ := buffers.Get() + defer buffers.Give(valBuf) + + if err := valBuf.Marshal(value); err != nil { + panic(err) + } + + return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes()) } func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { @@ -333,7 +390,10 @@ func (l *LevelDBPersistence) Prune() { } func (l *LevelDBPersistence) Size() (uint64, error) { - iterator := l.NewIterator(false) + iterator, err := l.NewIterator(false) + if err != nil { + return 0, err + } defer iterator.Close() if !iterator.SeekToFirst() { @@ -342,13 +402,13 @@ func (l *LevelDBPersistence) Size() (uint64, error) { keyspace := levigo.Range{} - keyspace.Start = iterator.Key() + keyspace.Start = iterator.rawKey() if !iterator.SeekToLast() { return 0, fmt.Errorf("could not seek to last key") } - keyspace.Limit = iterator.Key() + keyspace.Limit = iterator.rawKey() sizes := l.storage.GetApproximateSizes([]levigo.Range{keyspace}) total := uint64(0) @@ -374,7 +434,7 @@ func (l *LevelDBPersistence) Size() (uint64, error) { // will be leaked. // // The iterator is optionally snapshotable. -func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator { +func (l *LevelDBPersistence) NewIterator(snapshotted bool) (Iterator, error) { var ( snapshot *levigo.Snapshot readOptions *levigo.ReadOptions @@ -396,27 +456,27 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator { readOptions: readOptions, snapshot: snapshot, storage: l.storage, - } + }, nil } func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { - var ( - iterator = l.NewIterator(true) - valid bool - ) + iterator, err := l.NewIterator(true) + if err != nil { + return false, err + } + defer iterator.Close() - for valid = iterator.SeekToFirst(); valid; valid = iterator.Next() { - err = iterator.GetError() - if err != nil { - return + for valid := iterator.SeekToFirst(); valid; valid = iterator.Next() { + if err = iterator.Error(); err != nil { + return false, err } - decodedKey, decodeErr := decoder.DecodeKey(iterator.Key()) + decodedKey, decodeErr := decoder.DecodeKey(iterator.rawKey()) if decodeErr != nil { continue } - decodedValue, decodeErr := decoder.DecodeValue(iterator.Value()) + decodedValue, decodeErr := decoder.DecodeValue(iterator.rawValue()) if decodeErr != nil { continue } @@ -436,6 +496,5 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora } } } - scannedEntireCorpus = true - return + return true, nil } diff --git a/utility/freelist.go b/utility/freelist.go new file mode 100644 index 000000000..4c3d02c09 --- /dev/null +++ b/utility/freelist.go @@ -0,0 +1,45 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +type FreeList chan interface{} + +func NewFreeList(cap int) FreeList { + return make(FreeList, cap) +} + +func (l FreeList) Get() (interface{}, bool) { + select { + case v := <-l: + return v, true + default: + return nil, false + } +} + +func (l FreeList) Give(v interface{}) bool { + select { + case l <- v: + return true + default: + return false + } +} + +func (l FreeList) Close() { + close(l) + + for _ = range l { + } +} diff --git a/web/web.go b/web/web.go index be6593849..53ce2f288 100644 --- a/web/web.go +++ b/web/web.go @@ -21,6 +21,9 @@ import ( "net/http" "net/http/pprof" "os" + "time" + + pprof_runtime "runtime/pprof" "code.google.com/p/gorest" "github.com/golang/glog" @@ -63,6 +66,7 @@ func (w WebService) ServeForever() error { exp.Handle("/databases", w.DatabasesHandler) exp.Handle("/alerts", w.AlertsHandler) exp.HandleFunc("/graph", graphHandler) + exp.HandleFunc("/heap", dumpHeap) exp.Handle("/api/", compressionHandler{handler: gorest.Handle()}) exp.Handle("/metrics", prometheus.DefaultHandler) @@ -139,6 +143,18 @@ func executeTemplate(w http.ResponseWriter, name string, data interface{}) { } } +func dumpHeap(w http.ResponseWriter, r *http.Request) { + target := fmt.Sprintf("/tmp/%d.heap", time.Now().Unix()) + f, err := os.Create(target) + if err != nil { + glog.Error("Could not dump heap: ", err) + } + fmt.Fprintf(w, "Writing to %s...", target) + defer f.Close() + pprof_runtime.WriteHeapProfile(f) + fmt.Fprintf(w, "Done") +} + func MustBuildServerUrl() string { _, port, err := net.SplitHostPort(*listenAddress) if err != nil {