mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #169 from prometheus/feature/storage/compaction
Curator and Compaction Processor
This commit is contained in:
commit
26dbd0776e
|
@ -29,6 +29,8 @@ var (
|
|||
snappyVersion string
|
||||
)
|
||||
|
||||
// BuildInfo encapsulates compile-time metadata about Prometheus made available
|
||||
// via go tool ld such that this can be reported on-demand.
|
||||
var BuildInfo = map[string]string{
|
||||
"version": buildVersion,
|
||||
"branch": buildBranch,
|
||||
|
|
|
@ -22,7 +22,7 @@ include ../Makefile.INCLUDE
|
|||
generated: generated-stamp
|
||||
|
||||
generated-stamp: data.proto
|
||||
protoc --proto_path=$(PREFIX)/include:. --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
|
||||
protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
|
||||
touch $@
|
||||
|
||||
clean:
|
||||
|
|
|
@ -14,6 +14,9 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"fmt"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"time"
|
||||
)
|
||||
|
@ -24,16 +27,77 @@ type CurationRemark struct {
|
|||
LastCompletionTimestamp time.Time
|
||||
}
|
||||
|
||||
// OlderThanLimit answers whether this CurationRemark is older than the provided
|
||||
// OlderThan answers whether this CurationRemark is older than the provided
|
||||
// cutOff time.
|
||||
func (c CurationRemark) OlderThanLimit(cutOff time.Time) bool {
|
||||
return c.LastCompletionTimestamp.Before(cutOff)
|
||||
func (c CurationRemark) OlderThan(t time.Time) bool {
|
||||
return c.LastCompletionTimestamp.Before(t)
|
||||
}
|
||||
|
||||
// Equal answers whether the two CurationRemarks are equivalent.
|
||||
func (c CurationRemark) Equal(o CurationRemark) bool {
|
||||
return c.LastCompletionTimestamp.Equal(o.LastCompletionTimestamp)
|
||||
}
|
||||
|
||||
func (c CurationRemark) String() string {
|
||||
return fmt.Sprintf("Last curated at %s", c.LastCompletionTimestamp)
|
||||
}
|
||||
|
||||
// ToDTO generates the dto.CurationValue representation of this.
|
||||
func (c CurationRemark) ToDTO() *dto.CurationValue {
|
||||
return &dto.CurationValue{
|
||||
LastCompletionTimestamp: proto.Int64(c.LastCompletionTimestamp.Unix()),
|
||||
}
|
||||
}
|
||||
|
||||
// NewCurationRemarkFromDTO builds CurationRemark from the provided
|
||||
// dto.CurationValue object.
|
||||
func NewCurationRemarkFromDTO(d *dto.CurationValue) CurationRemark {
|
||||
return CurationRemark{
|
||||
LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0),
|
||||
LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0).UTC(),
|
||||
}
|
||||
}
|
||||
|
||||
// CurationKey provides a representation of dto.CurationKey with asociated
|
||||
// business logic methods attached to it to enhance code readability.
|
||||
type CurationKey struct {
|
||||
Fingerprint Fingerprint
|
||||
ProcessorMessageRaw []byte
|
||||
ProcessorMessageTypeName string
|
||||
IgnoreYoungerThan time.Duration
|
||||
}
|
||||
|
||||
// Equal answers whether the two CurationKeys are equivalent.
|
||||
func (c CurationKey) Equal(o CurationKey) (equal bool) {
|
||||
switch {
|
||||
case !c.Fingerprint.Equal(o.Fingerprint):
|
||||
return
|
||||
case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0:
|
||||
return
|
||||
case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName:
|
||||
return
|
||||
case c.IgnoreYoungerThan != o.IgnoreYoungerThan:
|
||||
return
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ToDTO generates a dto.CurationKey representation of this.
|
||||
func (c CurationKey) ToDTO() *dto.CurationKey {
|
||||
return &dto.CurationKey{
|
||||
Fingerprint: c.Fingerprint.ToDTO(),
|
||||
ProcessorMessageRaw: c.ProcessorMessageRaw,
|
||||
ProcessorMessageTypeName: proto.String(c.ProcessorMessageTypeName),
|
||||
IgnoreYoungerThan: proto.Int64(int64(c.IgnoreYoungerThan)),
|
||||
}
|
||||
}
|
||||
|
||||
// NewCurationKeyFromDTO builds CurationKey from the provided dto.CurationKey.
|
||||
func NewCurationKeyFromDTO(d *dto.CurationKey) CurationKey {
|
||||
return CurationKey{
|
||||
Fingerprint: NewFingerprintFromDTO(d.Fingerprint),
|
||||
ProcessorMessageRaw: d.ProcessorMessageRaw,
|
||||
ProcessorMessageTypeName: *d.ProcessorMessageTypeName,
|
||||
IgnoreYoungerThan: time.Duration(*d.IgnoreYoungerThan),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,14 @@ message MetricHighWatermark {
|
|||
optional int64 timestamp = 1;
|
||||
}
|
||||
|
||||
// CompactionProcessorDefinition models a curation process across the sample
|
||||
// corpus that ensures that sparse samples.
|
||||
message CompactionProcessorDefinition {
|
||||
// minimum_group_size identifies how minimally samples should be grouped
|
||||
// together to write a new SampleValueSeries chunk.
|
||||
optional uint32 minimum_group_size = 1;
|
||||
}
|
||||
|
||||
// CurationKey models the state of curation for a given metric fingerprint and
|
||||
// its associated samples. The time series database only knows about compaction
|
||||
// and resampling behaviors that are explicitly defined to it in its runtime
|
||||
|
@ -73,19 +81,38 @@ message MetricHighWatermark {
|
|||
// effectuation state for a given metric fingerprint is.
|
||||
//
|
||||
// For instance, how far along as a rule for (Fingerprint A, Samples Older Than
|
||||
// B, and Grouped Together in Size of C) has been effectuated on-disk.
|
||||
// B, and Curation Processor) has been effectuated on-disk.
|
||||
message CurationKey {
|
||||
// fingerprint identifies the fingerprint for the given policy.
|
||||
optional Fingerprint fingerprint = 1;
|
||||
// older_than represents in seconds relative to when curation cycle starts
|
||||
// into the past when the curator should stop operating on a given metric
|
||||
// fingerprint's samples:
|
||||
optional Fingerprint fingerprint = 1;
|
||||
|
||||
// processor_message_type_name identifies the underlying message type that
|
||||
// was used to encode processor_message_raw.
|
||||
optional string processor_message_type_name = 2;
|
||||
|
||||
// processor_message_raw identifies the serialized ProcessorSignature for this
|
||||
// operation.
|
||||
optional bytes processor_message_raw = 3;
|
||||
|
||||
// ignore_younger_than represents in seconds relative to when the curation
|
||||
// cycle start when the curator should stop operating. For instance, if
|
||||
// the curation cycle starts at time T and the curation remark dictates that
|
||||
// the curation should starts processing samples at time S, the curator should
|
||||
// work from S until ignore_younger_than seconds before T:
|
||||
//
|
||||
// [Oldest Sample Time, time.Now().Sub(time.Second * older_than))
|
||||
optional int64 older_than = 2;
|
||||
// minimum_group_size identifies how minimally samples should be grouped
|
||||
// together to write a new SampleValueSeries chunk.
|
||||
optional uint32 minimum_group_size = 3;
|
||||
// PAST NOW FUTURE
|
||||
//
|
||||
// S--------------->|----------T
|
||||
// |---IYT----|
|
||||
//
|
||||
// [Curation Resumption Time (S), T - IYT)
|
||||
optional int64 ignore_younger_than = 4;
|
||||
|
||||
// This could be populated by decoding the generated descriptor file into a
|
||||
// FileDescriptorSet message and extracting the type definition for the given
|
||||
// message schema that describes processor_message_type_name.
|
||||
//
|
||||
// optional google.protobuf.DescriptorProto processor_message_type_descriptor_raw = 5;
|
||||
}
|
||||
|
||||
// CurationValue models the progress for a given CurationKey.
|
||||
|
|
2
model/generated/.gitignore
vendored
2
model/generated/.gitignore
vendored
|
@ -1,2 +1,2 @@
|
|||
data.pb.go
|
||||
descriptor.blob
|
||||
descriptor.blob
|
|
@ -92,7 +92,11 @@ func (s SampleValue) ToDTO() *float64 {
|
|||
}
|
||||
|
||||
func (v SampleValue) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf("\"%f\"", v)), nil
|
||||
return []byte(fmt.Sprintf(`"%f"`, v)), nil
|
||||
}
|
||||
|
||||
func (v SampleValue) String() string {
|
||||
return fmt.Sprint(float64(v))
|
||||
}
|
||||
|
||||
func (s SamplePair) MarshalJSON() ([]byte, error) {
|
||||
|
@ -108,6 +112,19 @@ func (s SamplePair) Equal(o SamplePair) bool {
|
|||
return s.Value.Equal(o.Value) && s.Timestamp.Equal(o.Timestamp)
|
||||
}
|
||||
|
||||
func (s SamplePair) ToDTO() (out *dto.SampleValueSeries_Value) {
|
||||
out = &dto.SampleValueSeries_Value{
|
||||
Timestamp: proto.Int64(s.Timestamp.Unix()),
|
||||
Value: s.Value.ToDTO(),
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s SamplePair) String() string {
|
||||
return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value)
|
||||
}
|
||||
|
||||
type Values []SamplePair
|
||||
|
||||
func (v Values) Len() int {
|
||||
|
@ -174,6 +191,40 @@ func (v Values) TruncateBefore(t time.Time) (values Values) {
|
|||
return
|
||||
}
|
||||
|
||||
func (v Values) ToDTO() (out *dto.SampleValueSeries) {
|
||||
out = &dto.SampleValueSeries{}
|
||||
|
||||
for _, value := range v {
|
||||
out.Value = append(out.Value, value.ToDTO())
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (v Values) ToSampleKey(f Fingerprint) SampleKey {
|
||||
return SampleKey{
|
||||
Fingerprint: f,
|
||||
FirstTimestamp: v[0].Timestamp,
|
||||
LastTimestamp: v[len(v)-1].Timestamp,
|
||||
SampleCount: uint32(len(v)),
|
||||
}
|
||||
}
|
||||
|
||||
func (v Values) String() string {
|
||||
buffer := bytes.Buffer{}
|
||||
|
||||
fmt.Fprintf(&buffer, "[")
|
||||
for i, value := range v {
|
||||
fmt.Fprintf(&buffer, "%d. %s", i, value)
|
||||
if i != len(v)-1 {
|
||||
fmt.Fprintf(&buffer, "\n")
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(&buffer, "]")
|
||||
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) {
|
||||
for _, value := range dto.Value {
|
||||
v = append(v, SamplePair{
|
||||
|
|
|
@ -15,6 +15,7 @@ package model
|
|||
|
||||
import (
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"time"
|
||||
|
@ -67,6 +68,10 @@ func (s SampleKey) ToPartialDTO(out *dto.SampleKey) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s SampleKey) String() string {
|
||||
return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount)
|
||||
}
|
||||
|
||||
// NewSampleKeyFromDTO builds a new SampleKey from a provided data-transfer
|
||||
// object.
|
||||
func NewSampleKeyFromDTO(dto *dto.SampleKey) SampleKey {
|
||||
|
|
|
@ -41,11 +41,11 @@ const (
|
|||
// alert is used to track active (pending/firing) alerts over time.
|
||||
type alert struct {
|
||||
// The name of the alert.
|
||||
name string
|
||||
name string
|
||||
// The vector element labelset triggering this alert.
|
||||
metric model.Metric
|
||||
metric model.Metric
|
||||
// The state of the alert (PENDING or FIRING).
|
||||
state alertState
|
||||
state alertState
|
||||
// The time when the alert first transitioned into PENDING state.
|
||||
activeSince time.Time
|
||||
}
|
||||
|
@ -71,14 +71,14 @@ func (a alert) sample(timestamp time.Time, value model.SampleValue) model.Sample
|
|||
// An alerting rule generates alerts from its vector expression.
|
||||
type AlertingRule struct {
|
||||
// The name of the alert.
|
||||
name string
|
||||
// The vector expression from which to generate alerts.
|
||||
vector ast.VectorNode
|
||||
name string
|
||||
// The vector expression from which to generate alerts.
|
||||
vector ast.VectorNode
|
||||
// The duration for which a labelset needs to persist in the expression
|
||||
// output vector before an alert transitions from PENDING to FIRING state.
|
||||
holdDuration time.Duration
|
||||
// Extra labels to attach to the resulting alert sample vectors.
|
||||
labels model.LabelSet
|
||||
labels model.LabelSet
|
||||
// A map of alerts which are currently active (PENDING or FIRING), keyed by
|
||||
// the fingerprint of the labelset they correspond to.
|
||||
activeAlerts map[model.Fingerprint]*alert
|
||||
|
|
|
@ -34,6 +34,19 @@ const (
|
|||
ACCEPT
|
||||
)
|
||||
|
||||
func (f FilterResult) String() string {
|
||||
switch f {
|
||||
case STOP:
|
||||
return "STOP"
|
||||
case SKIP:
|
||||
return "SKIP"
|
||||
case ACCEPT:
|
||||
return "ACCEPT"
|
||||
}
|
||||
|
||||
panic("unknown")
|
||||
}
|
||||
|
||||
type OperatorErrorType int
|
||||
|
||||
type OperatorError struct {
|
||||
|
|
|
@ -15,14 +15,36 @@ package metric
|
|||
|
||||
import (
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/raw"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// watermarkFilter determines whether to include or exclude candidate
|
||||
// values from the curation process by virtue of how old the high watermark is.
|
||||
type watermarkFilter struct {
|
||||
// curationState is the data store for curation remarks.
|
||||
curationState raw.Persistence
|
||||
// ignoreYoungerThan conveys this filter's policy of not working on elements
|
||||
// younger than a given relative time duration. This is persisted to the
|
||||
// curation remark database (curationState) to indicate how far a given
|
||||
// policy of this type has progressed.
|
||||
ignoreYoungerThan time.Duration
|
||||
// processor is the post-processor that performs whatever action is desired on
|
||||
// the data that is deemed valid to be worked on.
|
||||
processor processor
|
||||
// stop functions as the global stop channel for all future operations.
|
||||
stop chan bool
|
||||
// stopAt is used to determine the elegibility of series for compaction.
|
||||
stopAt time.Time
|
||||
}
|
||||
|
||||
// curator is responsible for effectuating a given curation policy across the
|
||||
// stored samples on-disk. This is useful to compact sparse sample values into
|
||||
// single sample entities to reduce keyspace load on the datastore.
|
||||
|
@ -31,51 +53,103 @@ type curator struct {
|
|||
// The moment a value is ingested inside of it, the curator goes into drain
|
||||
// mode.
|
||||
stop chan bool
|
||||
// samples is the on-disk metric store that is scanned for compaction
|
||||
// candidates.
|
||||
samples raw.Persistence
|
||||
// watermarks is the on-disk store that is scanned for high watermarks for
|
||||
// given metrics.
|
||||
watermarks raw.Persistence
|
||||
// recencyThreshold represents the most recent time up to which values will be
|
||||
// curated.
|
||||
recencyThreshold time.Duration
|
||||
// groupingQuantity represents the number of samples below which encountered
|
||||
// samples will be dismembered and reaggregated into larger groups.
|
||||
groupingQuantity uint32
|
||||
// curationState is the on-disk store where the curation remarks are made for
|
||||
// how much progress has been made.
|
||||
}
|
||||
|
||||
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
||||
// into (model.Fingerprint, model.Watermark) doubles.
|
||||
type watermarkDecoder struct{}
|
||||
|
||||
// watermarkOperator scans over the curator.samples table for metrics whose
|
||||
// high watermark has been determined to be allowable for curation. This type
|
||||
// is individually responsible for compaction.
|
||||
//
|
||||
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
|
||||
// forward until the stop point or end of the series is reached.
|
||||
type watermarkOperator struct {
|
||||
// curationState is the data store for curation remarks.
|
||||
curationState raw.Persistence
|
||||
// diskFrontier models the available seekable ranges for the provided
|
||||
// sampleIterator.
|
||||
diskFrontier diskFrontier
|
||||
// ignoreYoungerThan is passed into the curation remark for the given series.
|
||||
ignoreYoungerThan time.Duration
|
||||
// processor is responsible for executing a given stategy on the
|
||||
// to-be-operated-on series.
|
||||
processor processor
|
||||
// sampleIterator is a snapshotted iterator for the time series.
|
||||
sampleIterator leveldb.Iterator
|
||||
// samples
|
||||
samples raw.Persistence
|
||||
// stopAt is a cue for when to stop mutating a given series.
|
||||
stopAt time.Time
|
||||
}
|
||||
|
||||
// newCurator builds a new curator for the given LevelDB databases.
|
||||
func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
|
||||
func newCurator() curator {
|
||||
return curator{
|
||||
recencyThreshold: recencyThreshold,
|
||||
stop: make(chan bool),
|
||||
samples: samples,
|
||||
curationState: curationState,
|
||||
watermarks: watermarks,
|
||||
groupingQuantity: groupingQuantity,
|
||||
stop: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
// run facilitates the curation lifecycle.
|
||||
func (c curator) run(instant time.Time) (err error) {
|
||||
decoder := watermarkDecoder{}
|
||||
filter := watermarkFilter{
|
||||
stop: c.stop,
|
||||
curationState: c.curationState,
|
||||
groupSize: c.groupingQuantity,
|
||||
recencyThreshold: c.recencyThreshold,
|
||||
//
|
||||
// recencyThreshold represents the most recent time up to which values will be
|
||||
// curated.
|
||||
// curationState is the on-disk store where the curation remarks are made for
|
||||
// how much progress has been made.
|
||||
func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence) (err error) {
|
||||
defer func(t time.Time) {
|
||||
duration := float64(time.Since(t))
|
||||
|
||||
labels := map[string]string{
|
||||
cutOff: fmt.Sprint(ignoreYoungerThan),
|
||||
processorName: processor.Name(),
|
||||
result: success,
|
||||
}
|
||||
if err != nil {
|
||||
labels[result] = failure
|
||||
}
|
||||
|
||||
curationDuration.IncrementBy(labels, duration)
|
||||
curationDurations.Add(labels, duration)
|
||||
}(time.Now())
|
||||
|
||||
iterator := samples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
diskFrontier, err := newDiskFrontier(iterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
operator := watermarkOperator{
|
||||
olderThan: instant.Add(-1 * c.recencyThreshold),
|
||||
groupSize: c.groupingQuantity,
|
||||
curationState: c.curationState,
|
||||
if diskFrontier == nil {
|
||||
// No sample database exists; no work to do!
|
||||
return
|
||||
}
|
||||
|
||||
_, err = c.watermarks.ForEach(decoder, filter, operator)
|
||||
decoder := watermarkDecoder{}
|
||||
|
||||
filter := watermarkFilter{
|
||||
curationState: curationState,
|
||||
processor: processor,
|
||||
ignoreYoungerThan: ignoreYoungerThan,
|
||||
stop: c.stop,
|
||||
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
||||
}
|
||||
|
||||
// Right now, the ability to stop a curation is limited to the beginning of
|
||||
// each fingerprint cycle. It is impractical to cease the work once it has
|
||||
// begun for a given series.
|
||||
operator := watermarkOperator{
|
||||
curationState: curationState,
|
||||
diskFrontier: *diskFrontier,
|
||||
processor: processor,
|
||||
ignoreYoungerThan: ignoreYoungerThan,
|
||||
sampleIterator: iterator,
|
||||
samples: samples,
|
||||
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
||||
}
|
||||
|
||||
_, err = watermarks.ForEach(decoder, filter, operator)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -88,35 +162,27 @@ func (c curator) drain() {
|
|||
}
|
||||
}
|
||||
|
||||
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
||||
// into (model.Fingerprint, model.Watermark) doubles.
|
||||
type watermarkDecoder struct{}
|
||||
|
||||
func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
|
||||
var (
|
||||
key = &dto.Fingerprint{}
|
||||
bytes = in.([]byte)
|
||||
)
|
||||
key := &dto.Fingerprint{}
|
||||
bytes := in.([]byte)
|
||||
|
||||
err = proto.Unmarshal(bytes, key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
out = model.NewFingerprintFromRowKey(*key.Signature)
|
||||
out = model.NewFingerprintFromDTO(key)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
|
||||
var (
|
||||
dto = &dto.MetricHighWatermark{}
|
||||
bytes = in.([]byte)
|
||||
)
|
||||
dto := &dto.MetricHighWatermark{}
|
||||
bytes := in.([]byte)
|
||||
|
||||
err = proto.Unmarshal(bytes, dto)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
out = model.NewWatermarkFromHighWatermarkDTO(dto)
|
||||
|
@ -124,149 +190,181 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro
|
|||
return
|
||||
}
|
||||
|
||||
// watermarkFilter determines whether to include or exclude candidate
|
||||
// values from the curation process by virtue of how old the high watermark is.
|
||||
type watermarkFilter struct {
|
||||
// curationState is the table of CurationKey to CurationValues that rema
|
||||
// far along the curation process has gone for a given metric fingerprint.
|
||||
curationState raw.Persistence
|
||||
// stop, when non-empty, instructs the filter to stop operation.
|
||||
stop chan bool
|
||||
// groupSize refers to the target groupSize from the curator.
|
||||
groupSize uint32
|
||||
// recencyThreshold refers to the target recencyThreshold from the curator.
|
||||
recencyThreshold time.Duration
|
||||
func (w watermarkFilter) shouldStop() bool {
|
||||
return len(w.stop) != 0
|
||||
}
|
||||
|
||||
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
|
||||
fingerprint := key.(model.Fingerprint)
|
||||
watermark := value.(model.Watermark)
|
||||
curationKey := &dto.CurationKey{
|
||||
Fingerprint: fingerprint.ToDTO(),
|
||||
MinimumGroupSize: proto.Uint32(w.groupSize),
|
||||
OlderThan: proto.Int64(int64(w.recencyThreshold)),
|
||||
func getCurationRemark(states raw.Persistence, processor processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) {
|
||||
rawSignature, err := processor.Signature()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
curationKey := model.CurationKey{
|
||||
Fingerprint: fingerprint,
|
||||
ProcessorMessageRaw: rawSignature,
|
||||
ProcessorMessageTypeName: processor.Name(),
|
||||
IgnoreYoungerThan: ignoreYoungerThan,
|
||||
}.ToDTO()
|
||||
curationValue := &dto.CurationValue{}
|
||||
|
||||
rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey))
|
||||
rawKey := coding.NewProtocolBuffer(curationKey)
|
||||
|
||||
has, err := states.Has(rawKey)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
if !has {
|
||||
return
|
||||
}
|
||||
|
||||
rawCurationValue, err := states.Get(rawKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = proto.Unmarshal(rawCurationValue, curationValue)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case model.NewCurationRemarkFromDTO(curationValue).OlderThanLimit(watermark.Time):
|
||||
result = storage.ACCEPT
|
||||
case len(w.stop) != 0:
|
||||
result = storage.STOP
|
||||
default:
|
||||
result = storage.SKIP
|
||||
}
|
||||
baseRemark := model.NewCurationRemarkFromDTO(curationValue)
|
||||
remark = &baseRemark
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// watermarkOperator scans over the curator.samples table for metrics whose
|
||||
// high watermark has been determined to be allowable for curation. This type
|
||||
// is individually responsible for compaction.
|
||||
//
|
||||
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
|
||||
// forward until the stop point or end of the series is reached.
|
||||
type watermarkOperator struct {
|
||||
// olderThan functions as the cutoff when scanning curator.samples for
|
||||
// uncurated samples to compact. The operator scans forward in the samples
|
||||
// until olderThan is reached and then stops operation for samples that occur
|
||||
// after it.
|
||||
olderThan time.Time
|
||||
// groupSize is the target quantity of samples to group together for a given
|
||||
// to-be-written sample. Observed samples of less than groupSize are combined
|
||||
// up to groupSize if possible. The protocol does not define the behavior if
|
||||
// observed chunks are larger than groupSize.
|
||||
groupSize uint32
|
||||
// curationState is the table of CurationKey to CurationValues that remark on
|
||||
// far along the curation process has gone for a given metric fingerprint.
|
||||
curationState raw.Persistence
|
||||
}
|
||||
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) {
|
||||
defer func() {
|
||||
labels := map[string]string{
|
||||
cutOff: fmt.Sprint(w.ignoreYoungerThan),
|
||||
result: strings.ToLower(r.String()),
|
||||
processorName: w.processor.Name(),
|
||||
}
|
||||
|
||||
func (w watermarkOperator) Operate(key, value interface{}) (err *storage.OperatorError) {
|
||||
var (
|
||||
fingerprint = key.(model.Fingerprint)
|
||||
watermark = value.(model.Watermark)
|
||||
queryErr error
|
||||
hasBeenCurated bool
|
||||
curationConsistent bool
|
||||
)
|
||||
hasBeenCurated, queryErr = w.hasBeenCurated(fingerprint)
|
||||
if queryErr != nil {
|
||||
err = &storage.OperatorError{queryErr, false}
|
||||
return
|
||||
curationFilterOperations.Increment(labels)
|
||||
}()
|
||||
|
||||
if w.shouldStop() {
|
||||
return storage.STOP
|
||||
}
|
||||
|
||||
if !hasBeenCurated {
|
||||
// curate
|
||||
fingerprint := key.(model.Fingerprint)
|
||||
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
curationConsistent, queryErr = w.curationConsistent(fingerprint, watermark)
|
||||
if queryErr != nil {
|
||||
err = &storage.OperatorError{queryErr, false}
|
||||
if curationRemark == nil {
|
||||
r = storage.ACCEPT
|
||||
return
|
||||
}
|
||||
if !curationRemark.OlderThan(w.stopAt) {
|
||||
return storage.SKIP
|
||||
}
|
||||
watermark := value.(model.Watermark)
|
||||
if !curationRemark.OlderThan(watermark.Time) {
|
||||
return storage.SKIP
|
||||
}
|
||||
curationConsistent, err := w.curationConsistent(fingerprint, watermark)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if curationConsistent {
|
||||
return
|
||||
return storage.SKIP
|
||||
}
|
||||
|
||||
// curate
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// hasBeenCurated answers true if the provided Fingerprint has been curated in
|
||||
// in the past.
|
||||
func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, err error) {
|
||||
curationKey := &dto.CurationKey{
|
||||
Fingerprint: f.ToDTO(),
|
||||
OlderThan: proto.Int64(w.olderThan.Unix()),
|
||||
MinimumGroupSize: proto.Uint32(w.groupSize),
|
||||
}
|
||||
|
||||
curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey))
|
||||
|
||||
return
|
||||
return storage.ACCEPT
|
||||
}
|
||||
|
||||
// curationConsistent determines whether the given metric is in a dirty state
|
||||
// and needs curation.
|
||||
func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
|
||||
var (
|
||||
rawValue []byte
|
||||
curationValue = &dto.CurationValue{}
|
||||
curationKey = &dto.CurationKey{
|
||||
Fingerprint: f.ToDTO(),
|
||||
OlderThan: proto.Int64(w.olderThan.Unix()),
|
||||
MinimumGroupSize: proto.Uint32(w.groupSize),
|
||||
}
|
||||
)
|
||||
|
||||
rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey))
|
||||
func (w watermarkFilter) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
|
||||
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, f)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = proto.Unmarshal(rawValue, curationValue)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
curationRemark := model.NewCurationRemarkFromDTO(curationValue)
|
||||
if !curationRemark.OlderThanLimit(watermark.Time) {
|
||||
if !curationRemark.OlderThan(watermark.Time) {
|
||||
consistent = true
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
|
||||
fingerprint := key.(model.Fingerprint)
|
||||
|
||||
seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
|
||||
if err != nil || seriesFrontier == nil {
|
||||
// An anomaly with the series frontier is severe in the sense that some sort
|
||||
// of an illegal state condition exists in the storage layer, which would
|
||||
// probably signify an illegal disk frontier.
|
||||
return &storage.OperatorError{err, false}
|
||||
}
|
||||
|
||||
curationState, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint)
|
||||
if err != nil {
|
||||
// An anomaly with the curation remark is likely not fatal in the sense that
|
||||
// there was a decoding error with the entity and shouldn't be cause to stop
|
||||
// work. The process will simply start from a pessimistic work time and
|
||||
// work forward. With an idempotent processor, this is safe.
|
||||
return &storage.OperatorError{err, true}
|
||||
}
|
||||
|
||||
startKey := model.SampleKey{
|
||||
Fingerprint: fingerprint,
|
||||
FirstTimestamp: seriesFrontier.optimalStartTime(curationState),
|
||||
}
|
||||
|
||||
prospectiveKey, err := coding.NewProtocolBuffer(startKey.ToDTO()).Encode()
|
||||
if err != nil {
|
||||
// An encoding failure of a key is no reason to stop.
|
||||
return &storage.OperatorError{err, true}
|
||||
}
|
||||
if !w.sampleIterator.Seek(prospectiveKey) {
|
||||
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
|
||||
// no work may occur, and the iterator cannot be recovered.
|
||||
return &storage.OperatorError{fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), false}
|
||||
}
|
||||
|
||||
newestAllowedSample := w.stopAt
|
||||
if !newestAllowedSample.Before(seriesFrontier.lastSupertime) {
|
||||
newestAllowedSample = seriesFrontier.lastSupertime
|
||||
}
|
||||
|
||||
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint)
|
||||
if err != nil {
|
||||
// We can't divine the severity of a processor error without refactoring the
|
||||
// interface.
|
||||
return &storage.OperatorError{err, false}
|
||||
}
|
||||
|
||||
err = w.refreshCurationRemark(fingerprint, lastTime)
|
||||
if err != nil {
|
||||
// Under the assumption that the processors are idempotent, they can be
|
||||
// re-run; thusly, the commitment of the curation remark is no cause
|
||||
// to cease further progress.
|
||||
return &storage.OperatorError{err, true}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished time.Time) (err error) {
|
||||
signature, err := w.processor.Signature()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
curationKey := model.CurationKey{
|
||||
Fingerprint: f,
|
||||
ProcessorMessageRaw: signature,
|
||||
ProcessorMessageTypeName: w.processor.Name(),
|
||||
IgnoreYoungerThan: w.ignoreYoungerThan,
|
||||
}.ToDTO()
|
||||
curationValue := model.CurationRemark{
|
||||
LastCompletionTimestamp: finished,
|
||||
}.ToDTO()
|
||||
|
||||
err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue))
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,523 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
curationState struct {
|
||||
fingerprint string
|
||||
groupSize int
|
||||
recencyThreshold time.Duration
|
||||
lastCurated time.Time
|
||||
}
|
||||
|
||||
watermarkState struct {
|
||||
fingerprint string
|
||||
lastAppended time.Time
|
||||
}
|
||||
|
||||
sample struct {
|
||||
time time.Time
|
||||
value model.SampleValue
|
||||
}
|
||||
|
||||
sampleGroup struct {
|
||||
fingerprint string
|
||||
values []sample
|
||||
}
|
||||
|
||||
in struct {
|
||||
curationStates fixture.Pairs
|
||||
watermarkStates fixture.Pairs
|
||||
sampleGroups fixture.Pairs
|
||||
recencyThreshold time.Duration
|
||||
groupSize uint32
|
||||
}
|
||||
)
|
||||
|
||||
func (c curationState) Get() (key, value coding.Encoder) {
|
||||
key = coding.NewProtocolBuffer(&dto.CurationKey{
|
||||
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(),
|
||||
MinimumGroupSize: proto.Uint32(uint32(c.groupSize)),
|
||||
OlderThan: proto.Int64(int64(c.recencyThreshold)),
|
||||
})
|
||||
|
||||
value = coding.NewProtocolBuffer(&dto.CurationValue{
|
||||
LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (w watermarkState) Get() (key, value coding.Encoder) {
|
||||
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
|
||||
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
|
||||
return
|
||||
}
|
||||
|
||||
func (s sampleGroup) Get() (key, value coding.Encoder) {
|
||||
key = coding.NewProtocolBuffer(&dto.SampleKey{
|
||||
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(),
|
||||
Timestamp: indexable.EncodeTime(s.values[0].time),
|
||||
LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()),
|
||||
SampleCount: proto.Uint32(uint32(len(s.values))),
|
||||
})
|
||||
|
||||
series := &dto.SampleValueSeries{}
|
||||
|
||||
for _, value := range s.values {
|
||||
series.Value = append(series.Value, &dto.SampleValueSeries_Value{
|
||||
Timestamp: proto.Int64(value.time.Unix()),
|
||||
Value: value.value.ToDTO(),
|
||||
})
|
||||
}
|
||||
|
||||
value = coding.NewProtocolBuffer(series)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestCurator(t *testing.T) {
|
||||
var (
|
||||
scenarios = []struct {
|
||||
in in
|
||||
}{
|
||||
{
|
||||
in: in{
|
||||
recencyThreshold: 1 * time.Hour,
|
||||
groupSize: 5,
|
||||
curationStates: fixture.Pairs{
|
||||
curationState{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
groupSize: 5,
|
||||
recencyThreshold: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
||||
},
|
||||
curationState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
groupSize: 5,
|
||||
recencyThreshold: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
},
|
||||
// This rule should effectively be ignored.
|
||||
curationState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
groupSize: 2,
|
||||
recencyThreshold: 30 * time.Minute,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
},
|
||||
},
|
||||
watermarkStates: fixture.Pairs{
|
||||
watermarkState{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
||||
},
|
||||
watermarkState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
||||
},
|
||||
},
|
||||
sampleGroups: fixture.Pairs{
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 90 * time.Minute),
|
||||
value: 0,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 85 * time.Minute),
|
||||
value: 1,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 80 * time.Minute),
|
||||
value: 2,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 75 * time.Minute),
|
||||
value: 3,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 70 * time.Minute),
|
||||
value: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 65 * time.Minute),
|
||||
value: 0,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 60 * time.Minute),
|
||||
value: 1,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 55 * time.Minute),
|
||||
value: 2,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 50 * time.Minute),
|
||||
value: 3,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 45 * time.Minute),
|
||||
value: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 40 * time.Minute),
|
||||
value: 0,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 35 * time.Minute),
|
||||
value: 1,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 30 * time.Minute),
|
||||
value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 25 * time.Minute),
|
||||
value: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 35 * time.Minute),
|
||||
value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 30 * time.Minute),
|
||||
value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 90 * time.Minute),
|
||||
value: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 89 * time.Minute),
|
||||
value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 88 * time.Minute),
|
||||
value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 87 * time.Minute),
|
||||
value: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 86 * time.Minute),
|
||||
value: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 85 * time.Minute),
|
||||
value: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 84 * time.Minute),
|
||||
value: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 83 * time.Minute),
|
||||
value: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 82 * time.Minute),
|
||||
value: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 81 * time.Minute),
|
||||
value: 9,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 80 * time.Minute),
|
||||
value: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 79 * time.Minute),
|
||||
value: 11,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 78 * time.Minute),
|
||||
value: 12,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 77 * time.Minute),
|
||||
value: 13,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 76 * time.Minute),
|
||||
value: 14,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 75 * time.Minute),
|
||||
value: 15,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 74 * time.Minute),
|
||||
value: 16,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 73 * time.Minute),
|
||||
value: 17,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 72 * time.Minute),
|
||||
value: 18,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 71 * time.Minute),
|
||||
value: 19,
|
||||
},
|
||||
{
|
||||
time: testInstant.Add(-1 * 70 * time.Minute),
|
||||
value: 20,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 69 * time.Minute),
|
||||
value: 21,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 68 * time.Minute),
|
||||
value: 22,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 67 * time.Minute),
|
||||
value: 23,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 66 * time.Minute),
|
||||
value: 24,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 65 * time.Minute),
|
||||
value: 25,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 64 * time.Minute),
|
||||
value: 26,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 63 * time.Minute),
|
||||
value: 27,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 62 * time.Minute),
|
||||
value: 28,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 61 * time.Minute),
|
||||
value: 29,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: []sample{
|
||||
{
|
||||
time: testInstant.Add(-1 * 60 * time.Minute),
|
||||
value: 30,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
for _, scenario := range scenarios {
|
||||
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
|
||||
defer curatorDirectory.Close()
|
||||
|
||||
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
|
||||
defer watermarkDirectory.Close()
|
||||
|
||||
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
|
||||
defer sampleDirectory.Close()
|
||||
|
||||
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer curatorStates.Close()
|
||||
|
||||
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer watermarkStates.Close()
|
||||
|
||||
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer samples.Close()
|
||||
|
||||
c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates)
|
||||
c.run(testInstant)
|
||||
}
|
||||
}
|
|
@ -51,7 +51,7 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
|
|||
|
||||
lastKey, err := extractSampleKey(i)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
panic(fmt.Sprintln(err, i.Key(), i.Value()))
|
||||
}
|
||||
|
||||
if !i.SeekToFirst() || i.Key() == nil {
|
||||
|
@ -90,10 +90,8 @@ func (f seriesFrontier) String() string {
|
|||
// fingerprint. A nil diskFrontier will be returned if the series cannot
|
||||
// be found in the store.
|
||||
func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
|
||||
var (
|
||||
lowerSeek = firstSupertime
|
||||
upperSeek = lastSupertime
|
||||
)
|
||||
lowerSeek := firstSupertime
|
||||
upperSeek := lastSupertime
|
||||
|
||||
// If the diskFrontier for this iterator says that the candidate fingerprint
|
||||
// is outside of its seeking domain, there is no way that a seriesFrontier
|
||||
|
@ -180,3 +178,44 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator)
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
// Contains indicates whether a given time value is within the recorded
|
||||
// interval.
|
||||
func (s seriesFrontier) Contains(t time.Time) bool {
|
||||
return !(t.Before(s.firstSupertime) || t.After(s.lastTime))
|
||||
}
|
||||
|
||||
// InSafeSeekRange indicates whether the time is within the recorded time range
|
||||
// and is safely seekable such that a seek does not result in an iterator point
|
||||
// after the last value of the series or outside of the entire store.
|
||||
func (s seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) {
|
||||
if !s.Contains(t) {
|
||||
return
|
||||
}
|
||||
|
||||
if s.lastSupertime.Before(t) {
|
||||
return
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s seriesFrontier) After(t time.Time) bool {
|
||||
return s.firstSupertime.After(t)
|
||||
}
|
||||
|
||||
// optimalStartTime indicates what the best start time for a curation operation
|
||||
// should be given the curation remark.
|
||||
func (s seriesFrontier) optimalStartTime(remark *model.CurationRemark) (t time.Time) {
|
||||
switch {
|
||||
case remark == nil:
|
||||
t = s.firstSupertime
|
||||
case s.After(remark.LastCompletionTimestamp):
|
||||
t = s.firstSupertime
|
||||
case !s.InSafeSeekRange(remark.LastCompletionTimestamp):
|
||||
t = s.lastSupertime
|
||||
default:
|
||||
t = remark.LastCompletionTimestamp
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -54,6 +54,9 @@ const (
|
|||
setLabelNameFingerprints = "set_label_name_fingerprints"
|
||||
setLabelPairFingerprints = "set_label_pair_fingerprints"
|
||||
writeMemory = "write_memory"
|
||||
|
||||
cutOff = "recency_threshold"
|
||||
processorName = "processor"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -65,6 +68,7 @@ var (
|
|||
|
||||
curationDuration = metrics.NewCounter()
|
||||
curationDurations = metrics.NewHistogram(diskLatencyHistogram)
|
||||
curationFilterOperations = metrics.NewCounter()
|
||||
storageOperations = metrics.NewCounter()
|
||||
storageOperationDurations = metrics.NewCounter()
|
||||
storageLatency = metrics.NewHistogram(diskLatencyHistogram)
|
||||
|
@ -88,4 +92,5 @@ func init() {
|
|||
registry.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", registry.NilLabels, storageLatency)
|
||||
registry.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", registry.NilLabels, storageOperationDurations)
|
||||
registry.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", registry.NilLabels, queueSizes)
|
||||
registry.Register("curation_filter_operations_total", "The number of curation filter operations completed.", registry.NilLabels, curationFilterOperations)
|
||||
}
|
||||
|
|
|
@ -164,12 +164,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr
|
|||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
err = l.AppendSamples(model.Samples{sample})
|
||||
|
||||
|
@ -220,23 +219,20 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl
|
|||
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
|
||||
// absent.
|
||||
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fingerprint]model.Metric) (unindexed map[model.Fingerprint]model.Metric, err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
unindexed = make(map[model.Fingerprint]model.Metric)
|
||||
|
||||
// Determine which metrics are unknown in the database.
|
||||
for fingerprint, metric := range candidates {
|
||||
var (
|
||||
dto = model.MetricToDTO(metric)
|
||||
indexHas, err = l.hasIndexMetric(dto)
|
||||
)
|
||||
dto := model.MetricToDTO(metric)
|
||||
indexHas, err := l.hasIndexMetric(dto)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return unindexed, err
|
||||
}
|
||||
if !indexHas {
|
||||
unindexed[fingerprint] = metric
|
||||
|
@ -252,12 +248,11 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fin
|
|||
//
|
||||
// This operation is idempotent.
|
||||
func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
labelNameFingerprints := map[model.LabelName]utility.Set{}
|
||||
|
||||
|
@ -269,7 +264,6 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
|
|||
|
||||
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -307,7 +301,6 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
|
|||
|
||||
err = l.labelNameToFingerprints.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -320,12 +313,11 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
|
|||
//
|
||||
// This operation is idempotent.
|
||||
func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
labelPairFingerprints := map[model.LabelPair]utility.Set{}
|
||||
|
||||
|
@ -343,7 +335,6 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
|
|||
labelName: labelValue,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -382,7 +373,6 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
|
|||
|
||||
err = l.labelSetToFingerprints.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -394,12 +384,11 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
|
|||
//
|
||||
// This operation is idempotent.
|
||||
func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerprint]model.Metric) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
@ -412,7 +401,7 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
|
|||
|
||||
err = l.fingerprintToMetrics.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
|
@ -422,12 +411,11 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
|
|||
// that are unknown to the storage stack, and then proceeds to update all
|
||||
// affected indices.
|
||||
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerprint]model.Metric) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
var (
|
||||
absentMetrics map[model.Fingerprint]model.Metric
|
||||
|
@ -435,7 +423,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
|
|||
|
||||
absentMetrics, err = l.findUnindexedMetrics(fingerprints)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(absentMetrics) == 0 {
|
||||
|
@ -444,11 +432,9 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
|
|||
|
||||
// TODO: For the missing fingerprints, determine what label names and pairs
|
||||
// are absent and act accordingly and append fingerprints.
|
||||
var (
|
||||
doneBuildingLabelNameIndex = make(chan error)
|
||||
doneBuildingLabelPairIndex = make(chan error)
|
||||
doneBuildingFingerprintIndex = make(chan error)
|
||||
)
|
||||
doneBuildingLabelNameIndex := make(chan error)
|
||||
doneBuildingLabelPairIndex := make(chan error)
|
||||
doneBuildingFingerprintIndex := make(chan error)
|
||||
|
||||
go func() {
|
||||
doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics)
|
||||
|
@ -462,22 +448,17 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
|
|||
doneBuildingFingerprintIndex <- l.indexFingerprints(absentMetrics)
|
||||
}()
|
||||
|
||||
makeTopLevelIndex := true
|
||||
|
||||
err = <-doneBuildingLabelNameIndex
|
||||
if err != nil {
|
||||
panic(err)
|
||||
makeTopLevelIndex = false
|
||||
return
|
||||
}
|
||||
err = <-doneBuildingLabelPairIndex
|
||||
if err != nil {
|
||||
panic(err)
|
||||
makeTopLevelIndex = false
|
||||
return
|
||||
}
|
||||
err = <-doneBuildingFingerprintIndex
|
||||
if err != nil {
|
||||
panic(err)
|
||||
makeTopLevelIndex = false
|
||||
return
|
||||
}
|
||||
|
||||
// If any of the preceding operations failed, we will have inconsistent
|
||||
|
@ -485,62 +466,51 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
|
|||
// its state is used to determine whether to bulk update the other indices.
|
||||
// Given that those operations are idempotent, it is OK to repeat them;
|
||||
// however, it will consume considerable amounts of time.
|
||||
if makeTopLevelIndex {
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
// WART: We should probably encode simple fingerprints.
|
||||
for _, metric := range absentMetrics {
|
||||
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
|
||||
batch.Put(key, key)
|
||||
}
|
||||
// WART: We should probably encode simple fingerprints.
|
||||
for _, metric := range absentMetrics {
|
||||
key := coding.NewProtocolBuffer(model.MetricToDTO(metric))
|
||||
batch.Put(key, key)
|
||||
}
|
||||
|
||||
err := l.metricMembershipIndex.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
// Not critical.
|
||||
log.Println(err)
|
||||
}
|
||||
err = l.metricMembershipIndex.Commit(batch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Fingerprint]model.Samples) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
batch := leveldb.NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
var (
|
||||
mutationCount = 0
|
||||
)
|
||||
mutationCount := 0
|
||||
for fingerprint, samples := range groups {
|
||||
var (
|
||||
key = &dto.Fingerprint{}
|
||||
value = &dto.MetricHighWatermark{}
|
||||
raw []byte
|
||||
newestSampleTimestamp = samples[len(samples)-1].Timestamp
|
||||
keyEncoded = coding.NewProtocolBuffer(key)
|
||||
)
|
||||
key := &dto.Fingerprint{}
|
||||
value := &dto.MetricHighWatermark{}
|
||||
raw := []byte{}
|
||||
newestSampleTimestamp := samples[len(samples)-1].Timestamp
|
||||
keyEncoded := coding.NewProtocolBuffer(key)
|
||||
|
||||
key.Signature = proto.String(fingerprint.ToRowKey())
|
||||
raw, err = l.metricHighWatermarks.Get(keyEncoded)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
if raw != nil {
|
||||
err = proto.Unmarshal(raw, value)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
|
||||
|
@ -554,19 +524,18 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
|
|||
|
||||
err = l.metricHighWatermarks.Commit(batch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {
|
||||
begin := time.Now()
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
var (
|
||||
fingerprintToSamples = groupByFingerprint(samples)
|
||||
|
@ -630,17 +599,17 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
|||
|
||||
err = l.metricSamples.Commit(samplesBatch)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
err = <-indexErrChan
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
err = <-watermarkErrChan
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
|
@ -691,13 +660,11 @@ func fingerprintsEqual(l *dto.Fingerprint, r *dto.Fingerprint) bool {
|
|||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
dtoKey := coding.NewProtocolBuffer(dto)
|
||||
value, err = l.metricMembershipIndex.Has(dtoKey)
|
||||
|
@ -706,13 +673,11 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
|
|||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
dtoKey := coding.NewProtocolBuffer(dto)
|
||||
value, err = l.labelSetToFingerprints.Has(dtoKey)
|
||||
|
@ -721,13 +686,11 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
|
|||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
dtoKey := coding.NewProtocolBuffer(dto)
|
||||
value, err = l.labelNameToFingerprints.Has(dtoKey)
|
||||
|
@ -736,13 +699,11 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
|
|||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fps model.Fingerprints, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
sets := []utility.Set{}
|
||||
|
||||
|
@ -786,13 +747,11 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
|
|||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.LabelName) (fps model.Fingerprints, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName)))
|
||||
if err != nil {
|
||||
|
@ -815,13 +774,11 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
|
|||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
defer func() {
|
||||
defer func(begin time.Time) {
|
||||
duration := time.Since(begin)
|
||||
|
||||
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
|
||||
}()
|
||||
}(time.Now())
|
||||
|
||||
raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f)))
|
||||
if err != nil {
|
||||
|
|
233
storage/metric/processor.go
Normal file
233
storage/metric/processor.go
Normal file
|
@ -0,0 +1,233 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage/raw"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"time"
|
||||
)
|
||||
|
||||
// processor models a post-processing agent that performs work given a sample
|
||||
// corpus.
|
||||
type processor interface {
|
||||
// Name emits the name of this processor's signature encoder. It must be
|
||||
// fully-qualified in the sense that it could be used via a Protocol Buffer
|
||||
// registry to extract the descriptor to reassemble this message.
|
||||
Name() string
|
||||
// Signature emits a byte signature for this process for the purpose of
|
||||
// remarking how far along it has been applied to the database.
|
||||
Signature() (signature []byte, err error)
|
||||
// Apply runs this processor against the sample set. sampleIterator expects
|
||||
// to be pre-seeked to the initial starting position. The processor will
|
||||
// run until up until stopAt has been reached. It is imperative that the
|
||||
// provided stopAt is within the interval of the series frontier.
|
||||
//
|
||||
// Upon completion or error, the last time at which the processor finished
|
||||
// shall be emitted in addition to any errors.
|
||||
Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
|
||||
}
|
||||
|
||||
// compactionProcessor combines sparse values in the database together such
|
||||
// that at least MinimumGroupSize is fulfilled from the
|
||||
type compactionProcessor struct {
|
||||
// MaximumMutationPoolBatch represents approximately the largest pending
|
||||
// batch of mutation operations for the database before pausing to
|
||||
// commit before resumption.
|
||||
//
|
||||
// A reasonable value would be (MinimumGroupSize * 2) + 1.
|
||||
MaximumMutationPoolBatch int
|
||||
// MinimumGroupSize represents the smallest allowed sample chunk size in the
|
||||
// database.
|
||||
MinimumGroupSize int
|
||||
// signature is the byte representation of the compactionProcessor's settings,
|
||||
// used for purely memoization purposes across an instance.
|
||||
signature []byte
|
||||
}
|
||||
|
||||
func (p compactionProcessor) Name() string {
|
||||
return "io.prometheus.CompactionProcessorDefinition"
|
||||
}
|
||||
|
||||
func (p *compactionProcessor) Signature() (out []byte, err error) {
|
||||
if len(p.signature) == 0 {
|
||||
out, err = proto.Marshal(&dto.CompactionProcessorDefinition{
|
||||
MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)),
|
||||
})
|
||||
|
||||
p.signature = out
|
||||
}
|
||||
|
||||
out = p.signature
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (p compactionProcessor) String() string {
|
||||
return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize)
|
||||
}
|
||||
|
||||
func (p compactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
|
||||
var pendingBatch raw.Batch = nil
|
||||
|
||||
defer func() {
|
||||
if pendingBatch != nil {
|
||||
pendingBatch.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
var pendingMutations = 0
|
||||
var pendingSamples model.Values
|
||||
var sampleKey model.SampleKey
|
||||
var sampleValues model.Values
|
||||
var lastTouchedTime time.Time
|
||||
var keyDropped bool
|
||||
|
||||
sampleKey, err = extractSampleKey(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
sampleValues, err = extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) {
|
||||
switch {
|
||||
// Furnish a new pending batch operation if none is available.
|
||||
case pendingBatch == nil:
|
||||
pendingBatch = leveldb.NewBatch()
|
||||
|
||||
// If there are no sample values to extract from the datastore, let's
|
||||
// continue extracting more values to use. We know that the time.Before()
|
||||
// block would prevent us from going into unsafe territory.
|
||||
case len(sampleValues) == 0:
|
||||
if !sampleIterator.Next() {
|
||||
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
|
||||
}
|
||||
|
||||
keyDropped = false
|
||||
|
||||
sampleKey, err = extractSampleKey(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
sampleValues, err = extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If the number of pending mutations exceeds the allowed batch amount,
|
||||
// commit to disk and delete the batch. A new one will be recreated if
|
||||
// necessary.
|
||||
case pendingMutations >= p.MaximumMutationPoolBatch:
|
||||
err = samples.Commit(pendingBatch)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
pendingMutations = 0
|
||||
|
||||
pendingBatch.Close()
|
||||
pendingBatch = nil
|
||||
|
||||
case len(pendingSamples) == 0 && len(sampleValues) >= p.MinimumGroupSize:
|
||||
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
|
||||
sampleValues = model.Values{}
|
||||
|
||||
case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize:
|
||||
if !keyDropped {
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
keyDropped = true
|
||||
}
|
||||
pendingSamples = append(pendingSamples, sampleValues...)
|
||||
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
|
||||
sampleValues = model.Values{}
|
||||
pendingMutations++
|
||||
|
||||
// If the number of pending writes equals the target group size
|
||||
case len(pendingSamples) == p.MinimumGroupSize:
|
||||
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
|
||||
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
|
||||
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
|
||||
pendingBatch.Put(key, value)
|
||||
pendingMutations++
|
||||
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
|
||||
if len(sampleValues) > 0 {
|
||||
if !keyDropped {
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
keyDropped = true
|
||||
}
|
||||
|
||||
if len(sampleValues) > p.MinimumGroupSize {
|
||||
pendingSamples = sampleValues[:p.MinimumGroupSize]
|
||||
sampleValues = sampleValues[p.MinimumGroupSize:]
|
||||
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
|
||||
} else {
|
||||
pendingSamples = sampleValues
|
||||
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
|
||||
sampleValues = model.Values{}
|
||||
}
|
||||
}
|
||||
|
||||
case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize:
|
||||
if !keyDropped {
|
||||
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
|
||||
pendingBatch.Drop(key)
|
||||
keyDropped = true
|
||||
}
|
||||
remainder := p.MinimumGroupSize - len(pendingSamples)
|
||||
pendingSamples = append(pendingSamples, sampleValues[:remainder]...)
|
||||
sampleValues = sampleValues[remainder:]
|
||||
if len(sampleValues) == 0 {
|
||||
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
|
||||
} else {
|
||||
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
|
||||
}
|
||||
pendingMutations++
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("Unhandled processing case.")
|
||||
}
|
||||
}
|
||||
|
||||
if len(sampleValues) > 0 || len(pendingSamples) > 0 {
|
||||
pendingSamples = append(sampleValues, pendingSamples...)
|
||||
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
|
||||
key := coding.NewProtocolBuffer(newSampleKey.ToDTO())
|
||||
value := coding.NewProtocolBuffer(pendingSamples.ToDTO())
|
||||
pendingBatch.Put(key, value)
|
||||
pendingSamples = model.Values{}
|
||||
pendingMutations++
|
||||
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
|
||||
}
|
||||
|
||||
// This is not deferred due to the off-chance that a pre-existing commit
|
||||
// failed.
|
||||
if pendingBatch != nil && pendingMutations > 0 {
|
||||
err = samples.Commit(pendingBatch)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
962
storage/metric/processor_test.go
Normal file
962
storage/metric/processor_test.go
Normal file
|
@ -0,0 +1,962 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type curationState struct {
|
||||
fingerprint string
|
||||
ignoreYoungerThan time.Duration
|
||||
lastCurated time.Time
|
||||
processor processor
|
||||
}
|
||||
|
||||
type watermarkState struct {
|
||||
fingerprint string
|
||||
lastAppended time.Time
|
||||
}
|
||||
|
||||
type sampleGroup struct {
|
||||
fingerprint string
|
||||
values model.Values
|
||||
}
|
||||
|
||||
type in struct {
|
||||
curationStates fixture.Pairs
|
||||
watermarkStates fixture.Pairs
|
||||
sampleGroups fixture.Pairs
|
||||
ignoreYoungerThan time.Duration
|
||||
groupSize uint32
|
||||
processor processor
|
||||
}
|
||||
|
||||
type out struct {
|
||||
curationStates []curationState
|
||||
sampleGroups []sampleGroup
|
||||
}
|
||||
|
||||
func (c curationState) Get() (key, value coding.Encoder) {
|
||||
signature, err := c.processor.Signature()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
key = coding.NewProtocolBuffer(model.CurationKey{
|
||||
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint),
|
||||
ProcessorMessageRaw: signature,
|
||||
ProcessorMessageTypeName: c.processor.Name(),
|
||||
IgnoreYoungerThan: c.ignoreYoungerThan,
|
||||
}.ToDTO())
|
||||
|
||||
value = coding.NewProtocolBuffer(model.CurationRemark{
|
||||
LastCompletionTimestamp: c.lastCurated,
|
||||
}.ToDTO())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (w watermarkState) Get() (key, value coding.Encoder) {
|
||||
key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO())
|
||||
value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO())
|
||||
return
|
||||
}
|
||||
|
||||
func (s sampleGroup) Get() (key, value coding.Encoder) {
|
||||
key = coding.NewProtocolBuffer(model.SampleKey{
|
||||
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint),
|
||||
FirstTimestamp: s.values[0].Timestamp,
|
||||
LastTimestamp: s.values[len(s.values)-1].Timestamp,
|
||||
SampleCount: uint32(len(s.values)),
|
||||
}.ToDTO())
|
||||
|
||||
value = coding.NewProtocolBuffer(s.values.ToDTO())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestCuratorCompactionProcessor(t *testing.T) {
|
||||
scenarios := []struct {
|
||||
in in
|
||||
out out
|
||||
}{
|
||||
{
|
||||
in: in{
|
||||
processor: &compactionProcessor{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
groupSize: 5,
|
||||
curationStates: fixture.Pairs{
|
||||
curationState{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
||||
processor: &compactionProcessor{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
},
|
||||
curationState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
processor: &compactionProcessor{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
},
|
||||
// This rule should effectively be ignored.
|
||||
curationState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
processor: &compactionProcessor{
|
||||
MinimumGroupSize: 2,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
ignoreYoungerThan: 30 * time.Minute,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
},
|
||||
},
|
||||
watermarkStates: fixture.Pairs{
|
||||
watermarkState{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
||||
},
|
||||
watermarkState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
|
||||
},
|
||||
},
|
||||
sampleGroups: fixture.Pairs{
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
|
||||
Value: 3,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
|
||||
Value: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
|
||||
Value: 0.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
|
||||
Value: 1.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 55 * time.Minute),
|
||||
Value: 2.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 50 * time.Minute),
|
||||
Value: 3.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 45 * time.Minute),
|
||||
Value: 4.25,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 40 * time.Minute),
|
||||
Value: 0.50,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 35 * time.Minute),
|
||||
Value: 1.50,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
|
||||
Value: 2.50,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 25 * time.Minute),
|
||||
Value: 0.75,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 20 * time.Minute),
|
||||
Value: -2,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
|
||||
Value: -3,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 1
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
|
||||
Value: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 1
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 1
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 1
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
|
||||
Value: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 1
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
|
||||
Value: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 2
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
|
||||
Value: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 2
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
|
||||
Value: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 2
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
|
||||
Value: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 2
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
|
||||
Value: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 2
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
|
||||
Value: 9,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 3
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
|
||||
Value: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 3
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
|
||||
Value: 11,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 3
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
|
||||
Value: 12,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 3
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
|
||||
Value: 13,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Blocks 3 and 4 and 5
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
// Block 3
|
||||
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
|
||||
Value: 14,
|
||||
},
|
||||
{
|
||||
// Block 4
|
||||
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
|
||||
Value: 15,
|
||||
},
|
||||
{
|
||||
// Block 4
|
||||
Timestamp: testInstant.Add(-1 * 74 * time.Minute),
|
||||
Value: 16,
|
||||
},
|
||||
{
|
||||
// Block 4
|
||||
Timestamp: testInstant.Add(-1 * 73 * time.Minute),
|
||||
Value: 17,
|
||||
},
|
||||
{
|
||||
// Block 4
|
||||
Timestamp: testInstant.Add(-1 * 72 * time.Minute),
|
||||
Value: 18,
|
||||
},
|
||||
{
|
||||
// Block 4
|
||||
Timestamp: testInstant.Add(-1 * 71 * time.Minute),
|
||||
Value: 19,
|
||||
},
|
||||
{
|
||||
// Block 5
|
||||
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
|
||||
Value: 20,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 5
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
|
||||
Value: 21,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 5
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
|
||||
Value: 22,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 5
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
|
||||
Value: 23,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 5
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
|
||||
Value: 24,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 6
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
|
||||
Value: 25,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 6
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
|
||||
Value: 26,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 6
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
|
||||
Value: 27,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 6
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
|
||||
Value: 28,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 6
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
|
||||
Value: 29,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroup{
|
||||
// Moved into Block 7
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
|
||||
Value: 30,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
out: out{
|
||||
curationStates: []curationState{
|
||||
{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
ignoreYoungerThan: time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
||||
processor: &compactionProcessor{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: 30 * time.Minute,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
processor: &compactionProcessor{
|
||||
MinimumGroupSize: 2,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
|
||||
processor: &compactionProcessor{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
},
|
||||
},
|
||||
sampleGroups: []sampleGroup{
|
||||
{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
|
||||
Value: 3,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
|
||||
Value: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
|
||||
Value: 0.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
|
||||
Value: 1.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 55 * time.Minute),
|
||||
Value: 2.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 50 * time.Minute),
|
||||
Value: 3.25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 45 * time.Minute),
|
||||
Value: 4.25,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 40 * time.Minute),
|
||||
Value: 0.50,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 35 * time.Minute),
|
||||
Value: 1.50,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
|
||||
Value: 2.50,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 25 * time.Minute),
|
||||
Value: 0.75,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 20 * time.Minute),
|
||||
Value: -2,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0001-A-1-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
|
||||
Value: -3,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Block 1
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
|
||||
Value: 3,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
|
||||
Value: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Block 2
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
|
||||
Value: 5,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
|
||||
Value: 6,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
|
||||
Value: 7,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
|
||||
Value: 8,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
|
||||
Value: 9,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// Block 3
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
|
||||
Value: 10,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
|
||||
Value: 11,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
|
||||
Value: 12,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
|
||||
Value: 13,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
|
||||
Value: 14,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
|
||||
Value: 15,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 74 * time.Minute),
|
||||
Value: 16,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 73 * time.Minute),
|
||||
Value: 17,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 72 * time.Minute),
|
||||
Value: 18,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 71 * time.Minute),
|
||||
Value: 19,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
|
||||
Value: 20,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
|
||||
Value: 21,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
|
||||
Value: 22,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
|
||||
Value: 23,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
|
||||
Value: 24,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
|
||||
Value: 25,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
|
||||
Value: 26,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
|
||||
Value: 27,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
|
||||
Value: 28,
|
||||
},
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
|
||||
Value: 29,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
values: model.Values{
|
||||
{
|
||||
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
|
||||
Value: 30,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, scenario := range scenarios {
|
||||
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
|
||||
defer curatorDirectory.Close()
|
||||
|
||||
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
|
||||
defer watermarkDirectory.Close()
|
||||
|
||||
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
|
||||
defer sampleDirectory.Close()
|
||||
|
||||
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer curatorStates.Close()
|
||||
|
||||
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer watermarkStates.Close()
|
||||
|
||||
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer samples.Close()
|
||||
|
||||
c := newCurator()
|
||||
err = c.run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
iterator := curatorStates.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
for j, expected := range scenario.out.curationStates {
|
||||
switch j {
|
||||
case 0:
|
||||
if !iterator.SeekToFirst() {
|
||||
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
|
||||
}
|
||||
default:
|
||||
if !iterator.Next() {
|
||||
t.Fatalf("%d.%d. could not seek to next.", i, j)
|
||||
}
|
||||
}
|
||||
|
||||
curationKeyDto := &dto.CurationKey{}
|
||||
curationValueDto := &dto.CurationValue{}
|
||||
|
||||
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
|
||||
if err != nil {
|
||||
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err)
|
||||
}
|
||||
err = proto.Unmarshal(iterator.Value(), curationValueDto)
|
||||
if err != nil {
|
||||
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err)
|
||||
}
|
||||
|
||||
curationKey := model.NewCurationKeyFromDTO(curationKeyDto)
|
||||
actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto)
|
||||
|
||||
signature, err := expected.processor.Signature()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
actualKey := curationKey
|
||||
expectedKey := model.CurationKey{
|
||||
Fingerprint: model.NewFingerprintFromRowKey(expected.fingerprint),
|
||||
IgnoreYoungerThan: expected.ignoreYoungerThan,
|
||||
ProcessorMessageRaw: signature,
|
||||
ProcessorMessageTypeName: expected.processor.Name(),
|
||||
}
|
||||
if !actualKey.Equal(expectedKey) {
|
||||
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
|
||||
}
|
||||
expectedCurationRemark := model.CurationRemark{
|
||||
LastCompletionTimestamp: expected.lastCurated,
|
||||
}
|
||||
if !actualCurationRemark.Equal(expectedCurationRemark) {
|
||||
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
|
||||
}
|
||||
}
|
||||
|
||||
iterator = samples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
for j, expected := range scenario.out.sampleGroups {
|
||||
switch j {
|
||||
case 0:
|
||||
if !iterator.SeekToFirst() {
|
||||
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
|
||||
}
|
||||
default:
|
||||
if !iterator.Next() {
|
||||
t.Fatalf("%d.%d. could not seek to next, expected %s", i, j, expected)
|
||||
}
|
||||
}
|
||||
|
||||
sampleKey, err := extractSampleKey(iterator)
|
||||
if err != nil {
|
||||
t.Fatalf("%d.%d. error %s", i, j, err)
|
||||
}
|
||||
sampleValues, err := extractSampleValues(iterator)
|
||||
if err != nil {
|
||||
t.Fatalf("%d.%d. error %s", i, j, err)
|
||||
}
|
||||
|
||||
if !model.NewFingerprintFromRowKey(expected.fingerprint).Equal(sampleKey.Fingerprint) {
|
||||
t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, expected.fingerprint, sampleKey.Fingerprint)
|
||||
}
|
||||
|
||||
if int(sampleKey.SampleCount) != len(expected.values) {
|
||||
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), sampleKey.SampleCount)
|
||||
}
|
||||
|
||||
if len(sampleValues) != len(expected.values) {
|
||||
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), len(sampleValues))
|
||||
}
|
||||
|
||||
if !sampleKey.FirstTimestamp.Equal(expected.values[0].Timestamp) {
|
||||
t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.values[0].Timestamp, sampleKey.FirstTimestamp)
|
||||
}
|
||||
|
||||
for k, actualValue := range sampleValues {
|
||||
if expected.values[k].Value != actualValue.Value {
|
||||
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value)
|
||||
}
|
||||
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
|
||||
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
if !sampleKey.LastTimestamp.Equal(expected.values[len(expected.values)-1].Timestamp) {
|
||||
fmt.Println("last", sampleValues[len(expected.values)-1].Value, expected.values[len(expected.values)-1].Value)
|
||||
t.Errorf("%d.%d. expected %s, got %s", i, j, expected.values[len(expected.values)-1].Timestamp, sampleKey.LastTimestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,30 +14,34 @@
|
|||
package leveldb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/jmhodges/levigo"
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
)
|
||||
|
||||
type batch struct {
|
||||
batch *levigo.WriteBatch
|
||||
drops uint32
|
||||
puts uint32
|
||||
}
|
||||
|
||||
func NewBatch() batch {
|
||||
return batch{
|
||||
func NewBatch() *batch {
|
||||
return &batch{
|
||||
batch: levigo.NewWriteBatch(),
|
||||
}
|
||||
}
|
||||
|
||||
func (b batch) Drop(key coding.Encoder) {
|
||||
func (b *batch) Drop(key coding.Encoder) {
|
||||
keyEncoded, err := key.Encode()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
b.drops++
|
||||
|
||||
b.batch.Delete(keyEncoded)
|
||||
}
|
||||
|
||||
func (b batch) Put(key, value coding.Encoder) {
|
||||
func (b *batch) Put(key, value coding.Encoder) {
|
||||
keyEncoded, err := key.Encode()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -46,6 +50,7 @@ func (b batch) Put(key, value coding.Encoder) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
b.puts++
|
||||
|
||||
b.batch.Put(keyEncoded, valueEncoded)
|
||||
}
|
||||
|
@ -53,3 +58,7 @@ func (b batch) Put(key, value coding.Encoder) {
|
|||
func (b batch) Close() {
|
||||
b.batch.Close()
|
||||
}
|
||||
|
||||
func (b batch) String() string {
|
||||
return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops)
|
||||
}
|
||||
|
|
|
@ -298,7 +298,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
|
|||
// tests, we could create a Batch struct that journals pending
|
||||
// operations which the given Persistence implementation could convert
|
||||
// to its specific commit requirements.
|
||||
batch, ok := b.(batch)
|
||||
batch, ok := b.(*batch)
|
||||
if !ok {
|
||||
panic("leveldb.batch expected")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue