prometheus/storage/metric/processor.go
Matt T. Proud 4298bab2b0 Publicize Curator and Processors.
This commit publicizes the curation and processor frameworks for
purposes of making them available in the main processor loop.
2013-05-02 12:37:24 +02:00

234 lines
7.7 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"code.google.com/p/goprotobuf/proto"
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"time"
)
// processor models a post-processing agent that performs work given a sample
// corpus.
type Processor interface {
// Name emits the name of this processor's signature encoder. It must be
// fully-qualified in the sense that it could be used via a Protocol Buffer
// registry to extract the descriptor to reassemble this message.
Name() string
// Signature emits a byte signature for this process for the purpose of
// remarking how far along it has been applied to the database.
Signature() (signature []byte, err error)
// Apply runs this processor against the sample set. sampleIterator expects
// to be pre-seeked to the initial starting position. The processor will
// run until up until stopAt has been reached. It is imperative that the
// provided stopAt is within the interval of the series frontier.
//
// Upon completion or error, the last time at which the processor finished
// shall be emitted in addition to any errors.
Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
}
// CompactionProcessor combines sparse values in the database together such
// that at least MinimumGroupSize-sized chunks are grouped together.
type CompactionProcessor struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
//
// A reasonable value would be (MinimumGroupSize * 2) + 1.
MaximumMutationPoolBatch int
// MinimumGroupSize represents the smallest allowed sample chunk size in the
// database.
MinimumGroupSize int
// signature is the byte representation of the CompactionProcessor's settings,
// used for purely memoization purposes across an instance.
signature []byte
}
func (p CompactionProcessor) Name() string {
return "io.prometheus.CompactionProcessorDefinition"
}
func (p *CompactionProcessor) Signature() (out []byte, err error) {
if len(p.signature) == 0 {
out, err = proto.Marshal(&dto.CompactionProcessorDefinition{
MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)),
})
p.signature = out
}
out = p.signature
return
}
func (p CompactionProcessor) String() string {
return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize)
}
func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
var pendingBatch raw.Batch = nil
defer func() {
if pendingBatch != nil {
pendingBatch.Close()
}
}()
var pendingMutations = 0
var pendingSamples model.Values
var sampleKey model.SampleKey
var sampleValues model.Values
var lastTouchedTime time.Time
var keyDropped bool
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) {
switch {
// Furnish a new pending batch operation if none is available.
case pendingBatch == nil:
pendingBatch = leveldb.NewBatch()
// If there are no sample values to extract from the datastore, let's
// continue extracting more values to use. We know that the time.Before()
// block would prevent us from going into unsafe territory.
case len(sampleValues) == 0:
if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
}
keyDropped = false
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
// If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
case pendingMutations >= p.MaximumMutationPoolBatch:
err = samples.Commit(pendingBatch)
if err != nil {
return
}
pendingMutations = 0
pendingBatch.Close()
pendingBatch = nil
case len(pendingSamples) == 0 && len(sampleValues) >= p.MinimumGroupSize:
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
sampleValues = model.Values{}
case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize:
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
pendingSamples = append(pendingSamples, sampleValues...)
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
sampleValues = model.Values{}
pendingMutations++
// If the number of pending writes equals the target group size
case len(pendingSamples) == p.MinimumGroupSize:
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
if len(sampleValues) > 0 {
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
if len(sampleValues) > p.MinimumGroupSize {
pendingSamples = sampleValues[:p.MinimumGroupSize]
sampleValues = sampleValues[p.MinimumGroupSize:]
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
} else {
pendingSamples = sampleValues
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
sampleValues = model.Values{}
}
}
case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize:
if !keyDropped {
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
remainder := p.MinimumGroupSize - len(pendingSamples)
pendingSamples = append(pendingSamples, sampleValues[:remainder]...)
sampleValues = sampleValues[remainder:]
if len(sampleValues) == 0 {
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
} else {
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
}
pendingMutations++
default:
err = fmt.Errorf("Unhandled processing case.")
}
}
if len(sampleValues) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(sampleValues, pendingSamples...)
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingSamples = model.Values{}
pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
}
// This is not deferred due to the off-chance that a pre-existing commit
// failed.
if pendingBatch != nil && pendingMutations > 0 {
err = samples.Commit(pendingBatch)
if err != nil {
return
}
}
return
}