diff --git a/build_info.go b/build_info.go index fe00fa49d..52cd47686 100644 --- a/build_info.go +++ b/build_info.go @@ -29,6 +29,8 @@ var ( snappyVersion string ) +// BuildInfo encapsulates compile-time metadata about Prometheus made available +// via go tool ld such that this can be reported on-demand. var BuildInfo = map[string]string{ "version": buildVersion, "branch": buildBranch, diff --git a/model/Makefile b/model/Makefile index 328b33f5e..7fe2c1dec 100644 --- a/model/Makefile +++ b/model/Makefile @@ -22,7 +22,7 @@ include ../Makefile.INCLUDE generated: generated-stamp generated-stamp: data.proto - protoc --proto_path=$(PREFIX)/include:. --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto + protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto touch $@ clean: diff --git a/model/curation.go b/model/curation.go index 26bcdf8e6..c7bb8594a 100644 --- a/model/curation.go +++ b/model/curation.go @@ -14,6 +14,9 @@ package model import ( + "bytes" + "code.google.com/p/goprotobuf/proto" + "fmt" dto "github.com/prometheus/prometheus/model/generated" "time" ) @@ -24,16 +27,77 @@ type CurationRemark struct { LastCompletionTimestamp time.Time } -// OlderThanLimit answers whether this CurationRemark is older than the provided +// OlderThan answers whether this CurationRemark is older than the provided // cutOff time. -func (c CurationRemark) OlderThanLimit(cutOff time.Time) bool { - return c.LastCompletionTimestamp.Before(cutOff) +func (c CurationRemark) OlderThan(t time.Time) bool { + return c.LastCompletionTimestamp.Before(t) +} + +// Equal answers whether the two CurationRemarks are equivalent. +func (c CurationRemark) Equal(o CurationRemark) bool { + return c.LastCompletionTimestamp.Equal(o.LastCompletionTimestamp) +} + +func (c CurationRemark) String() string { + return fmt.Sprintf("Last curated at %s", c.LastCompletionTimestamp) +} + +// ToDTO generates the dto.CurationValue representation of this. +func (c CurationRemark) ToDTO() *dto.CurationValue { + return &dto.CurationValue{ + LastCompletionTimestamp: proto.Int64(c.LastCompletionTimestamp.Unix()), + } } // NewCurationRemarkFromDTO builds CurationRemark from the provided // dto.CurationValue object. func NewCurationRemarkFromDTO(d *dto.CurationValue) CurationRemark { return CurationRemark{ - LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0), + LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0).UTC(), + } +} + +// CurationKey provides a representation of dto.CurationKey with asociated +// business logic methods attached to it to enhance code readability. +type CurationKey struct { + Fingerprint Fingerprint + ProcessorMessageRaw []byte + ProcessorMessageTypeName string + IgnoreYoungerThan time.Duration +} + +// Equal answers whether the two CurationKeys are equivalent. +func (c CurationKey) Equal(o CurationKey) (equal bool) { + switch { + case !c.Fingerprint.Equal(o.Fingerprint): + return + case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0: + return + case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName: + return + case c.IgnoreYoungerThan != o.IgnoreYoungerThan: + return + } + + return true +} + +// ToDTO generates a dto.CurationKey representation of this. +func (c CurationKey) ToDTO() *dto.CurationKey { + return &dto.CurationKey{ + Fingerprint: c.Fingerprint.ToDTO(), + ProcessorMessageRaw: c.ProcessorMessageRaw, + ProcessorMessageTypeName: proto.String(c.ProcessorMessageTypeName), + IgnoreYoungerThan: proto.Int64(int64(c.IgnoreYoungerThan)), + } +} + +// NewCurationKeyFromDTO builds CurationKey from the provided dto.CurationKey. +func NewCurationKeyFromDTO(d *dto.CurationKey) CurationKey { + return CurationKey{ + Fingerprint: NewFingerprintFromDTO(d.Fingerprint), + ProcessorMessageRaw: d.ProcessorMessageRaw, + ProcessorMessageTypeName: *d.ProcessorMessageTypeName, + IgnoreYoungerThan: time.Duration(*d.IgnoreYoungerThan), } } diff --git a/model/data.proto b/model/data.proto index 1678da473..e92fdec3b 100644 --- a/model/data.proto +++ b/model/data.proto @@ -65,6 +65,14 @@ message MetricHighWatermark { optional int64 timestamp = 1; } +// CompactionProcessorDefinition models a curation process across the sample +// corpus that ensures that sparse samples. +message CompactionProcessorDefinition { + // minimum_group_size identifies how minimally samples should be grouped + // together to write a new SampleValueSeries chunk. + optional uint32 minimum_group_size = 1; +} + // CurationKey models the state of curation for a given metric fingerprint and // its associated samples. The time series database only knows about compaction // and resampling behaviors that are explicitly defined to it in its runtime @@ -73,19 +81,38 @@ message MetricHighWatermark { // effectuation state for a given metric fingerprint is. // // For instance, how far along as a rule for (Fingerprint A, Samples Older Than -// B, and Grouped Together in Size of C) has been effectuated on-disk. +// B, and Curation Processor) has been effectuated on-disk. message CurationKey { // fingerprint identifies the fingerprint for the given policy. - optional Fingerprint fingerprint = 1; - // older_than represents in seconds relative to when curation cycle starts - // into the past when the curator should stop operating on a given metric - // fingerprint's samples: + optional Fingerprint fingerprint = 1; + + // processor_message_type_name identifies the underlying message type that + // was used to encode processor_message_raw. + optional string processor_message_type_name = 2; + + // processor_message_raw identifies the serialized ProcessorSignature for this + // operation. + optional bytes processor_message_raw = 3; + + // ignore_younger_than represents in seconds relative to when the curation + // cycle start when the curator should stop operating. For instance, if + // the curation cycle starts at time T and the curation remark dictates that + // the curation should starts processing samples at time S, the curator should + // work from S until ignore_younger_than seconds before T: // - // [Oldest Sample Time, time.Now().Sub(time.Second * older_than)) - optional int64 older_than = 2; - // minimum_group_size identifies how minimally samples should be grouped - // together to write a new SampleValueSeries chunk. - optional uint32 minimum_group_size = 3; + // PAST NOW FUTURE + // + // S--------------->|----------T + // |---IYT----| + // + // [Curation Resumption Time (S), T - IYT) + optional int64 ignore_younger_than = 4; + + // This could be populated by decoding the generated descriptor file into a + // FileDescriptorSet message and extracting the type definition for the given + // message schema that describes processor_message_type_name. + // + // optional google.protobuf.DescriptorProto processor_message_type_descriptor_raw = 5; } // CurationValue models the progress for a given CurationKey. diff --git a/model/generated/.gitignore b/model/generated/.gitignore index 01d8818eb..e590fda01 100644 --- a/model/generated/.gitignore +++ b/model/generated/.gitignore @@ -1,2 +1,2 @@ data.pb.go -descriptor.blob +descriptor.blob \ No newline at end of file diff --git a/model/metric.go b/model/metric.go index 191fa3233..66fc8cdf6 100644 --- a/model/metric.go +++ b/model/metric.go @@ -92,7 +92,11 @@ func (s SampleValue) ToDTO() *float64 { } func (v SampleValue) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf("\"%f\"", v)), nil + return []byte(fmt.Sprintf(`"%f"`, v)), nil +} + +func (v SampleValue) String() string { + return fmt.Sprint(float64(v)) } func (s SamplePair) MarshalJSON() ([]byte, error) { @@ -108,6 +112,19 @@ func (s SamplePair) Equal(o SamplePair) bool { return s.Value.Equal(o.Value) && s.Timestamp.Equal(o.Timestamp) } +func (s SamplePair) ToDTO() (out *dto.SampleValueSeries_Value) { + out = &dto.SampleValueSeries_Value{ + Timestamp: proto.Int64(s.Timestamp.Unix()), + Value: s.Value.ToDTO(), + } + + return +} + +func (s SamplePair) String() string { + return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value) +} + type Values []SamplePair func (v Values) Len() int { @@ -174,6 +191,40 @@ func (v Values) TruncateBefore(t time.Time) (values Values) { return } +func (v Values) ToDTO() (out *dto.SampleValueSeries) { + out = &dto.SampleValueSeries{} + + for _, value := range v { + out.Value = append(out.Value, value.ToDTO()) + } + + return +} + +func (v Values) ToSampleKey(f Fingerprint) SampleKey { + return SampleKey{ + Fingerprint: f, + FirstTimestamp: v[0].Timestamp, + LastTimestamp: v[len(v)-1].Timestamp, + SampleCount: uint32(len(v)), + } +} + +func (v Values) String() string { + buffer := bytes.Buffer{} + + fmt.Fprintf(&buffer, "[") + for i, value := range v { + fmt.Fprintf(&buffer, "%d. %s", i, value) + if i != len(v)-1 { + fmt.Fprintf(&buffer, "\n") + } + } + fmt.Fprintf(&buffer, "]") + + return buffer.String() +} + func NewValuesFromDTO(dto *dto.SampleValueSeries) (v Values) { for _, value := range dto.Value { v = append(v, SamplePair{ diff --git a/model/samplekey.go b/model/samplekey.go index ed9be185c..103fefecd 100644 --- a/model/samplekey.go +++ b/model/samplekey.go @@ -15,6 +15,7 @@ package model import ( "code.google.com/p/goprotobuf/proto" + "fmt" "github.com/prometheus/prometheus/coding/indexable" dto "github.com/prometheus/prometheus/model/generated" "time" @@ -67,6 +68,10 @@ func (s SampleKey) ToPartialDTO(out *dto.SampleKey) { return } +func (s SampleKey) String() string { + return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount) +} + // NewSampleKeyFromDTO builds a new SampleKey from a provided data-transfer // object. func NewSampleKeyFromDTO(dto *dto.SampleKey) SampleKey { diff --git a/rules/alerting.go b/rules/alerting.go index b26c4900e..a6b4da7ed 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -41,11 +41,11 @@ const ( // alert is used to track active (pending/firing) alerts over time. type alert struct { // The name of the alert. - name string + name string // The vector element labelset triggering this alert. - metric model.Metric + metric model.Metric // The state of the alert (PENDING or FIRING). - state alertState + state alertState // The time when the alert first transitioned into PENDING state. activeSince time.Time } @@ -71,14 +71,14 @@ func (a alert) sample(timestamp time.Time, value model.SampleValue) model.Sample // An alerting rule generates alerts from its vector expression. type AlertingRule struct { // The name of the alert. - name string - // The vector expression from which to generate alerts. - vector ast.VectorNode + name string + // The vector expression from which to generate alerts. + vector ast.VectorNode // The duration for which a labelset needs to persist in the expression // output vector before an alert transitions from PENDING to FIRING state. holdDuration time.Duration // Extra labels to attach to the resulting alert sample vectors. - labels model.LabelSet + labels model.LabelSet // A map of alerts which are currently active (PENDING or FIRING), keyed by // the fingerprint of the labelset they correspond to. activeAlerts map[model.Fingerprint]*alert diff --git a/storage/interface.go b/storage/interface.go index 2bc1b72d3..44361c354 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -34,6 +34,19 @@ const ( ACCEPT ) +func (f FilterResult) String() string { + switch f { + case STOP: + return "STOP" + case SKIP: + return "SKIP" + case ACCEPT: + return "ACCEPT" + } + + panic("unknown") +} + type OperatorErrorType int type OperatorError struct { diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 592f4ae87..e372b9f8c 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -15,14 +15,36 @@ package metric import ( "code.google.com/p/goprotobuf/proto" + "fmt" "github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" + "github.com/prometheus/prometheus/storage/raw/leveldb" + "strings" "time" ) +// watermarkFilter determines whether to include or exclude candidate +// values from the curation process by virtue of how old the high watermark is. +type watermarkFilter struct { + // curationState is the data store for curation remarks. + curationState raw.Persistence + // ignoreYoungerThan conveys this filter's policy of not working on elements + // younger than a given relative time duration. This is persisted to the + // curation remark database (curationState) to indicate how far a given + // policy of this type has progressed. + ignoreYoungerThan time.Duration + // processor is the post-processor that performs whatever action is desired on + // the data that is deemed valid to be worked on. + processor processor + // stop functions as the global stop channel for all future operations. + stop chan bool + // stopAt is used to determine the elegibility of series for compaction. + stopAt time.Time +} + // curator is responsible for effectuating a given curation policy across the // stored samples on-disk. This is useful to compact sparse sample values into // single sample entities to reduce keyspace load on the datastore. @@ -31,51 +53,103 @@ type curator struct { // The moment a value is ingested inside of it, the curator goes into drain // mode. stop chan bool - // samples is the on-disk metric store that is scanned for compaction - // candidates. - samples raw.Persistence - // watermarks is the on-disk store that is scanned for high watermarks for - // given metrics. - watermarks raw.Persistence - // recencyThreshold represents the most recent time up to which values will be - // curated. - recencyThreshold time.Duration - // groupingQuantity represents the number of samples below which encountered - // samples will be dismembered and reaggregated into larger groups. - groupingQuantity uint32 - // curationState is the on-disk store where the curation remarks are made for - // how much progress has been made. +} + +// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles +// into (model.Fingerprint, model.Watermark) doubles. +type watermarkDecoder struct{} + +// watermarkOperator scans over the curator.samples table for metrics whose +// high watermark has been determined to be allowable for curation. This type +// is individually responsible for compaction. +// +// The scanning starts from CurationRemark.LastCompletionTimestamp and goes +// forward until the stop point or end of the series is reached. +type watermarkOperator struct { + // curationState is the data store for curation remarks. curationState raw.Persistence + // diskFrontier models the available seekable ranges for the provided + // sampleIterator. + diskFrontier diskFrontier + // ignoreYoungerThan is passed into the curation remark for the given series. + ignoreYoungerThan time.Duration + // processor is responsible for executing a given stategy on the + // to-be-operated-on series. + processor processor + // sampleIterator is a snapshotted iterator for the time series. + sampleIterator leveldb.Iterator + // samples + samples raw.Persistence + // stopAt is a cue for when to stop mutating a given series. + stopAt time.Time } // newCurator builds a new curator for the given LevelDB databases. -func newCurator(recencyThreshold time.Duration, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { +func newCurator() curator { return curator{ - recencyThreshold: recencyThreshold, - stop: make(chan bool), - samples: samples, - curationState: curationState, - watermarks: watermarks, - groupingQuantity: groupingQuantity, + stop: make(chan bool), } } // run facilitates the curation lifecycle. -func (c curator) run(instant time.Time) (err error) { - decoder := watermarkDecoder{} - filter := watermarkFilter{ - stop: c.stop, - curationState: c.curationState, - groupSize: c.groupingQuantity, - recencyThreshold: c.recencyThreshold, +// +// recencyThreshold represents the most recent time up to which values will be +// curated. +// curationState is the on-disk store where the curation remarks are made for +// how much progress has been made. +func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence) (err error) { + defer func(t time.Time) { + duration := float64(time.Since(t)) + + labels := map[string]string{ + cutOff: fmt.Sprint(ignoreYoungerThan), + processorName: processor.Name(), + result: success, + } + if err != nil { + labels[result] = failure + } + + curationDuration.IncrementBy(labels, duration) + curationDurations.Add(labels, duration) + }(time.Now()) + + iterator := samples.NewIterator(true) + defer iterator.Close() + + diskFrontier, err := newDiskFrontier(iterator) + if err != nil { + return } - operator := watermarkOperator{ - olderThan: instant.Add(-1 * c.recencyThreshold), - groupSize: c.groupingQuantity, - curationState: c.curationState, + if diskFrontier == nil { + // No sample database exists; no work to do! + return } - _, err = c.watermarks.ForEach(decoder, filter, operator) + decoder := watermarkDecoder{} + + filter := watermarkFilter{ + curationState: curationState, + processor: processor, + ignoreYoungerThan: ignoreYoungerThan, + stop: c.stop, + stopAt: instant.Add(-1 * ignoreYoungerThan), + } + + // Right now, the ability to stop a curation is limited to the beginning of + // each fingerprint cycle. It is impractical to cease the work once it has + // begun for a given series. + operator := watermarkOperator{ + curationState: curationState, + diskFrontier: *diskFrontier, + processor: processor, + ignoreYoungerThan: ignoreYoungerThan, + sampleIterator: iterator, + samples: samples, + stopAt: instant.Add(-1 * ignoreYoungerThan), + } + + _, err = watermarks.ForEach(decoder, filter, operator) return } @@ -88,35 +162,27 @@ func (c curator) drain() { } } -// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles -// into (model.Fingerprint, model.Watermark) doubles. -type watermarkDecoder struct{} - func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) { - var ( - key = &dto.Fingerprint{} - bytes = in.([]byte) - ) + key := &dto.Fingerprint{} + bytes := in.([]byte) err = proto.Unmarshal(bytes, key) if err != nil { - panic(err) + return } - out = model.NewFingerprintFromRowKey(*key.Signature) + out = model.NewFingerprintFromDTO(key) return } func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) { - var ( - dto = &dto.MetricHighWatermark{} - bytes = in.([]byte) - ) + dto := &dto.MetricHighWatermark{} + bytes := in.([]byte) err = proto.Unmarshal(bytes, dto) if err != nil { - panic(err) + return } out = model.NewWatermarkFromHighWatermarkDTO(dto) @@ -124,149 +190,181 @@ func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err erro return } -// watermarkFilter determines whether to include or exclude candidate -// values from the curation process by virtue of how old the high watermark is. -type watermarkFilter struct { - // curationState is the table of CurationKey to CurationValues that rema - // far along the curation process has gone for a given metric fingerprint. - curationState raw.Persistence - // stop, when non-empty, instructs the filter to stop operation. - stop chan bool - // groupSize refers to the target groupSize from the curator. - groupSize uint32 - // recencyThreshold refers to the target recencyThreshold from the curator. - recencyThreshold time.Duration +func (w watermarkFilter) shouldStop() bool { + return len(w.stop) != 0 } -func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) { - fingerprint := key.(model.Fingerprint) - watermark := value.(model.Watermark) - curationKey := &dto.CurationKey{ - Fingerprint: fingerprint.ToDTO(), - MinimumGroupSize: proto.Uint32(w.groupSize), - OlderThan: proto.Int64(int64(w.recencyThreshold)), +func getCurationRemark(states raw.Persistence, processor processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) { + rawSignature, err := processor.Signature() + if err != nil { + return } + + curationKey := model.CurationKey{ + Fingerprint: fingerprint, + ProcessorMessageRaw: rawSignature, + ProcessorMessageTypeName: processor.Name(), + IgnoreYoungerThan: ignoreYoungerThan, + }.ToDTO() curationValue := &dto.CurationValue{} - rawCurationValue, err := w.curationState.Get(coding.NewProtocolBuffer(curationKey)) + rawKey := coding.NewProtocolBuffer(curationKey) + + has, err := states.Has(rawKey) if err != nil { - panic(err) + return + } + if !has { + return + } + + rawCurationValue, err := states.Get(rawKey) + if err != nil { + return } err = proto.Unmarshal(rawCurationValue, curationValue) if err != nil { - panic(err) + return } - switch { - case model.NewCurationRemarkFromDTO(curationValue).OlderThanLimit(watermark.Time): - result = storage.ACCEPT - case len(w.stop) != 0: - result = storage.STOP - default: - result = storage.SKIP - } + baseRemark := model.NewCurationRemarkFromDTO(curationValue) + remark = &baseRemark return } -// watermarkOperator scans over the curator.samples table for metrics whose -// high watermark has been determined to be allowable for curation. This type -// is individually responsible for compaction. -// -// The scanning starts from CurationRemark.LastCompletionTimestamp and goes -// forward until the stop point or end of the series is reached. -type watermarkOperator struct { - // olderThan functions as the cutoff when scanning curator.samples for - // uncurated samples to compact. The operator scans forward in the samples - // until olderThan is reached and then stops operation for samples that occur - // after it. - olderThan time.Time - // groupSize is the target quantity of samples to group together for a given - // to-be-written sample. Observed samples of less than groupSize are combined - // up to groupSize if possible. The protocol does not define the behavior if - // observed chunks are larger than groupSize. - groupSize uint32 - // curationState is the table of CurationKey to CurationValues that remark on - // far along the curation process has gone for a given metric fingerprint. - curationState raw.Persistence -} +func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) { + defer func() { + labels := map[string]string{ + cutOff: fmt.Sprint(w.ignoreYoungerThan), + result: strings.ToLower(r.String()), + processorName: w.processor.Name(), + } -func (w watermarkOperator) Operate(key, value interface{}) (err *storage.OperatorError) { - var ( - fingerprint = key.(model.Fingerprint) - watermark = value.(model.Watermark) - queryErr error - hasBeenCurated bool - curationConsistent bool - ) - hasBeenCurated, queryErr = w.hasBeenCurated(fingerprint) - if queryErr != nil { - err = &storage.OperatorError{queryErr, false} - return + curationFilterOperations.Increment(labels) + }() + + if w.shouldStop() { + return storage.STOP } - if !hasBeenCurated { - // curate + fingerprint := key.(model.Fingerprint) + curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint) + if err != nil { return } - - curationConsistent, queryErr = w.curationConsistent(fingerprint, watermark) - if queryErr != nil { - err = &storage.OperatorError{queryErr, false} + if curationRemark == nil { + r = storage.ACCEPT + return + } + if !curationRemark.OlderThan(w.stopAt) { + return storage.SKIP + } + watermark := value.(model.Watermark) + if !curationRemark.OlderThan(watermark.Time) { + return storage.SKIP + } + curationConsistent, err := w.curationConsistent(fingerprint, watermark) + if err != nil { return } if curationConsistent { - return + return storage.SKIP } - // curate - - return -} - -// hasBeenCurated answers true if the provided Fingerprint has been curated in -// in the past. -func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, err error) { - curationKey := &dto.CurationKey{ - Fingerprint: f.ToDTO(), - OlderThan: proto.Int64(w.olderThan.Unix()), - MinimumGroupSize: proto.Uint32(w.groupSize), - } - - curated, err = w.curationState.Has(coding.NewProtocolBuffer(curationKey)) - - return + return storage.ACCEPT } // curationConsistent determines whether the given metric is in a dirty state // and needs curation. -func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) { - var ( - rawValue []byte - curationValue = &dto.CurationValue{} - curationKey = &dto.CurationKey{ - Fingerprint: f.ToDTO(), - OlderThan: proto.Int64(w.olderThan.Unix()), - MinimumGroupSize: proto.Uint32(w.groupSize), - } - ) - - rawValue, err = w.curationState.Get(coding.NewProtocolBuffer(curationKey)) +func (w watermarkFilter) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) { + curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, f) if err != nil { return } - - err = proto.Unmarshal(rawValue, curationValue) - if err != nil { - return - } - - curationRemark := model.NewCurationRemarkFromDTO(curationValue) - if !curationRemark.OlderThanLimit(watermark.Time) { + if !curationRemark.OlderThan(watermark.Time) { consistent = true - return } return } + +func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) { + fingerprint := key.(model.Fingerprint) + + seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator) + if err != nil || seriesFrontier == nil { + // An anomaly with the series frontier is severe in the sense that some sort + // of an illegal state condition exists in the storage layer, which would + // probably signify an illegal disk frontier. + return &storage.OperatorError{err, false} + } + + curationState, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint) + if err != nil { + // An anomaly with the curation remark is likely not fatal in the sense that + // there was a decoding error with the entity and shouldn't be cause to stop + // work. The process will simply start from a pessimistic work time and + // work forward. With an idempotent processor, this is safe. + return &storage.OperatorError{err, true} + } + + startKey := model.SampleKey{ + Fingerprint: fingerprint, + FirstTimestamp: seriesFrontier.optimalStartTime(curationState), + } + + prospectiveKey, err := coding.NewProtocolBuffer(startKey.ToDTO()).Encode() + if err != nil { + // An encoding failure of a key is no reason to stop. + return &storage.OperatorError{err, true} + } + if !w.sampleIterator.Seek(prospectiveKey) { + // LevelDB is picky about the seek ranges. If an iterator was invalidated, + // no work may occur, and the iterator cannot be recovered. + return &storage.OperatorError{fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), false} + } + + newestAllowedSample := w.stopAt + if !newestAllowedSample.Before(seriesFrontier.lastSupertime) { + newestAllowedSample = seriesFrontier.lastSupertime + } + + lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint) + if err != nil { + // We can't divine the severity of a processor error without refactoring the + // interface. + return &storage.OperatorError{err, false} + } + + err = w.refreshCurationRemark(fingerprint, lastTime) + if err != nil { + // Under the assumption that the processors are idempotent, they can be + // re-run; thusly, the commitment of the curation remark is no cause + // to cease further progress. + return &storage.OperatorError{err, true} + } + + return +} + +func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished time.Time) (err error) { + signature, err := w.processor.Signature() + if err != nil { + return + } + curationKey := model.CurationKey{ + Fingerprint: f, + ProcessorMessageRaw: signature, + ProcessorMessageTypeName: w.processor.Name(), + IgnoreYoungerThan: w.ignoreYoungerThan, + }.ToDTO() + curationValue := model.CurationRemark{ + LastCompletionTimestamp: finished, + }.ToDTO() + + err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue)) + + return +} diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go deleted file mode 100644 index 782dcd0e9..000000000 --- a/storage/metric/curator_test.go +++ /dev/null @@ -1,523 +0,0 @@ -// Copyright 2013 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -import ( - "code.google.com/p/goprotobuf/proto" - "github.com/prometheus/prometheus/coding" - "github.com/prometheus/prometheus/coding/indexable" - "github.com/prometheus/prometheus/model" - dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage/raw/leveldb" - fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" - "testing" - "time" -) - -type ( - curationState struct { - fingerprint string - groupSize int - recencyThreshold time.Duration - lastCurated time.Time - } - - watermarkState struct { - fingerprint string - lastAppended time.Time - } - - sample struct { - time time.Time - value model.SampleValue - } - - sampleGroup struct { - fingerprint string - values []sample - } - - in struct { - curationStates fixture.Pairs - watermarkStates fixture.Pairs - sampleGroups fixture.Pairs - recencyThreshold time.Duration - groupSize uint32 - } -) - -func (c curationState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBuffer(&dto.CurationKey{ - Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint).ToDTO(), - MinimumGroupSize: proto.Uint32(uint32(c.groupSize)), - OlderThan: proto.Int64(int64(c.recencyThreshold)), - }) - - value = coding.NewProtocolBuffer(&dto.CurationValue{ - LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()), - }) - - return -} - -func (w watermarkState) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) - value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) - return -} - -func (s sampleGroup) Get() (key, value coding.Encoder) { - key = coding.NewProtocolBuffer(&dto.SampleKey{ - Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint).ToDTO(), - Timestamp: indexable.EncodeTime(s.values[0].time), - LastTimestamp: proto.Int64(s.values[len(s.values)-1].time.Unix()), - SampleCount: proto.Uint32(uint32(len(s.values))), - }) - - series := &dto.SampleValueSeries{} - - for _, value := range s.values { - series.Value = append(series.Value, &dto.SampleValueSeries_Value{ - Timestamp: proto.Int64(value.time.Unix()), - Value: value.value.ToDTO(), - }) - } - - value = coding.NewProtocolBuffer(series) - - return -} - -func TestCurator(t *testing.T) { - var ( - scenarios = []struct { - in in - }{ - { - in: in{ - recencyThreshold: 1 * time.Hour, - groupSize: 5, - curationStates: fixture.Pairs{ - curationState{ - fingerprint: "0001-A-1-Z", - groupSize: 5, - recencyThreshold: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 30 * time.Minute), - }, - curationState{ - fingerprint: "0002-A-2-Z", - groupSize: 5, - recencyThreshold: 1 * time.Hour, - lastCurated: testInstant.Add(-1 * 90 * time.Minute), - }, - // This rule should effectively be ignored. - curationState{ - fingerprint: "0002-A-2-Z", - groupSize: 2, - recencyThreshold: 30 * time.Minute, - lastCurated: testInstant.Add(-1 * 90 * time.Minute), - }, - }, - watermarkStates: fixture.Pairs{ - watermarkState{ - fingerprint: "0001-A-1-Z", - lastAppended: testInstant.Add(-1 * 15 * time.Minute), - }, - watermarkState{ - fingerprint: "0002-A-2-Z", - lastAppended: testInstant.Add(-1 * 15 * time.Minute), - }, - }, - sampleGroups: fixture.Pairs{ - sampleGroup{ - fingerprint: "0001-A-1-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 90 * time.Minute), - value: 0, - }, - { - time: testInstant.Add(-1 * 85 * time.Minute), - value: 1, - }, - { - time: testInstant.Add(-1 * 80 * time.Minute), - value: 2, - }, - { - time: testInstant.Add(-1 * 75 * time.Minute), - value: 3, - }, - { - time: testInstant.Add(-1 * 70 * time.Minute), - value: 4, - }, - }, - }, - sampleGroup{ - fingerprint: "0001-A-1-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 65 * time.Minute), - value: 0, - }, - { - time: testInstant.Add(-1 * 60 * time.Minute), - value: 1, - }, - { - time: testInstant.Add(-1 * 55 * time.Minute), - value: 2, - }, - { - time: testInstant.Add(-1 * 50 * time.Minute), - value: 3, - }, - { - time: testInstant.Add(-1 * 45 * time.Minute), - value: 4, - }, - }, - }, - sampleGroup{ - fingerprint: "0001-A-1-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 40 * time.Minute), - value: 0, - }, - { - time: testInstant.Add(-1 * 35 * time.Minute), - value: 1, - }, - { - time: testInstant.Add(-1 * 30 * time.Minute), - value: 2, - }, - }, - }, - sampleGroup{ - fingerprint: "0001-A-1-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 25 * time.Minute), - value: 0, - }, - }, - }, - sampleGroup{ - fingerprint: "0001-A-1-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 35 * time.Minute), - value: 1, - }, - }, - }, - sampleGroup{ - fingerprint: "0001-A-1-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 30 * time.Minute), - value: 2, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 90 * time.Minute), - value: 0, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 89 * time.Minute), - value: 1, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 88 * time.Minute), - value: 2, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 87 * time.Minute), - value: 3, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 86 * time.Minute), - value: 4, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 85 * time.Minute), - value: 5, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 84 * time.Minute), - value: 6, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 83 * time.Minute), - value: 7, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 82 * time.Minute), - value: 8, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 81 * time.Minute), - value: 9, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 80 * time.Minute), - value: 10, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 79 * time.Minute), - value: 11, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 78 * time.Minute), - value: 12, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 77 * time.Minute), - value: 13, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 76 * time.Minute), - value: 14, - }, - { - time: testInstant.Add(-1 * 75 * time.Minute), - value: 15, - }, - { - time: testInstant.Add(-1 * 74 * time.Minute), - value: 16, - }, - { - time: testInstant.Add(-1 * 73 * time.Minute), - value: 17, - }, - { - time: testInstant.Add(-1 * 72 * time.Minute), - value: 18, - }, - { - time: testInstant.Add(-1 * 71 * time.Minute), - value: 19, - }, - { - time: testInstant.Add(-1 * 70 * time.Minute), - value: 20, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 69 * time.Minute), - value: 21, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 68 * time.Minute), - value: 22, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 67 * time.Minute), - value: 23, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 66 * time.Minute), - value: 24, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 65 * time.Minute), - value: 25, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 64 * time.Minute), - value: 26, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 63 * time.Minute), - value: 27, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 62 * time.Minute), - value: 28, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 61 * time.Minute), - value: 29, - }, - }, - }, - sampleGroup{ - fingerprint: "0002-A-2-Z", - values: []sample{ - { - time: testInstant.Add(-1 * 60 * time.Minute), - value: 30, - }, - }, - }, - }, - }, - }, - } - ) - - for _, scenario := range scenarios { - curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) - defer curatorDirectory.Close() - - watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) - defer watermarkDirectory.Close() - - sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) - defer sampleDirectory.Close() - - curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) - if err != nil { - t.Fatal(err) - } - defer curatorStates.Close() - - watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) - if err != nil { - t.Fatal(err) - } - defer watermarkStates.Close() - - samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) - if err != nil { - t.Fatal(err) - } - defer samples.Close() - - c := newCurator(scenario.in.recencyThreshold, scenario.in.groupSize, curatorStates, samples, watermarkStates) - c.run(testInstant) - } -} diff --git a/storage/metric/frontier.go b/storage/metric/frontier.go index a6be095d7..7ea427893 100644 --- a/storage/metric/frontier.go +++ b/storage/metric/frontier.go @@ -51,7 +51,7 @@ func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) { lastKey, err := extractSampleKey(i) if err != nil { - panic(err) + panic(fmt.Sprintln(err, i.Key(), i.Value())) } if !i.SeekToFirst() || i.Key() == nil { @@ -90,10 +90,8 @@ func (f seriesFrontier) String() string { // fingerprint. A nil diskFrontier will be returned if the series cannot // be found in the store. func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) { - var ( - lowerSeek = firstSupertime - upperSeek = lastSupertime - ) + lowerSeek := firstSupertime + upperSeek := lastSupertime // If the diskFrontier for this iterator says that the candidate fingerprint // is outside of its seeking domain, there is no way that a seriesFrontier @@ -180,3 +178,44 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) return } + +// Contains indicates whether a given time value is within the recorded +// interval. +func (s seriesFrontier) Contains(t time.Time) bool { + return !(t.Before(s.firstSupertime) || t.After(s.lastTime)) +} + +// InSafeSeekRange indicates whether the time is within the recorded time range +// and is safely seekable such that a seek does not result in an iterator point +// after the last value of the series or outside of the entire store. +func (s seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) { + if !s.Contains(t) { + return + } + + if s.lastSupertime.Before(t) { + return + } + + return true +} + +func (s seriesFrontier) After(t time.Time) bool { + return s.firstSupertime.After(t) +} + +// optimalStartTime indicates what the best start time for a curation operation +// should be given the curation remark. +func (s seriesFrontier) optimalStartTime(remark *model.CurationRemark) (t time.Time) { + switch { + case remark == nil: + t = s.firstSupertime + case s.After(remark.LastCompletionTimestamp): + t = s.firstSupertime + case !s.InSafeSeekRange(remark.LastCompletionTimestamp): + t = s.lastSupertime + default: + t = remark.LastCompletionTimestamp + } + return +} diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index cb3381fb0..136853bdc 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -54,6 +54,9 @@ const ( setLabelNameFingerprints = "set_label_name_fingerprints" setLabelPairFingerprints = "set_label_pair_fingerprints" writeMemory = "write_memory" + + cutOff = "recency_threshold" + processorName = "processor" ) var ( @@ -65,6 +68,7 @@ var ( curationDuration = metrics.NewCounter() curationDurations = metrics.NewHistogram(diskLatencyHistogram) + curationFilterOperations = metrics.NewCounter() storageOperations = metrics.NewCounter() storageOperationDurations = metrics.NewCounter() storageLatency = metrics.NewHistogram(diskLatencyHistogram) @@ -88,4 +92,5 @@ func init() { registry.Register("prometheus_metric_disk_latency_microseconds", "Latency for metric disk operations in microseconds.", registry.NilLabels, storageLatency) registry.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", registry.NilLabels, storageOperationDurations) registry.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", registry.NilLabels, queueSizes) + registry.Register("curation_filter_operations_total", "The number of curation filter operations completed.", registry.NilLabels, curationFilterOperations) } diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index ee8255916..17cebfa1c 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -164,12 +164,11 @@ func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetr } func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: appendSample, result: failure}) - }() + }(time.Now()) err = l.AppendSamples(model.Samples{sample}) @@ -220,23 +219,20 @@ func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Sampl // in the keyspace and returns a map of Fingerprint-Metric pairs that are // absent. func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fingerprint]model.Metric) (unindexed map[model.Fingerprint]model.Metric, err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure}) - }() + }(time.Now()) unindexed = make(map[model.Fingerprint]model.Metric) // Determine which metrics are unknown in the database. for fingerprint, metric := range candidates { - var ( - dto = model.MetricToDTO(metric) - indexHas, err = l.hasIndexMetric(dto) - ) + dto := model.MetricToDTO(metric) + indexHas, err := l.hasIndexMetric(dto) if err != nil { - panic(err) + return unindexed, err } if !indexHas { unindexed[fingerprint] = metric @@ -252,12 +248,11 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[model.Fin // // This operation is idempotent. func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint]model.Metric) (err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure}) - }() + }(time.Now()) labelNameFingerprints := map[model.LabelName]utility.Set{} @@ -269,7 +264,6 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint fingerprints, err := l.GetFingerprintsForLabelName(labelName) if err != nil { - panic(err) return err } @@ -307,7 +301,6 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint err = l.labelNameToFingerprints.Commit(batch) if err != nil { - panic(err) return } @@ -320,12 +313,11 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint // // This operation is idempotent. func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint]model.Metric) (err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure}) - }() + }(time.Now()) labelPairFingerprints := map[model.LabelPair]utility.Set{} @@ -343,7 +335,6 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint labelName: labelValue, }) if err != nil { - panic(err) return err } @@ -382,7 +373,6 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint err = l.labelSetToFingerprints.Commit(batch) if err != nil { - panic(err) return } @@ -394,12 +384,11 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint // // This operation is idempotent. func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerprint]model.Metric) (err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure}) - }() + }(time.Now()) batch := leveldb.NewBatch() defer batch.Close() @@ -412,7 +401,7 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri err = l.fingerprintToMetrics.Commit(batch) if err != nil { - panic(err) + return } return @@ -422,12 +411,11 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri // that are unknown to the storage stack, and then proceeds to update all // affected indices. func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerprint]model.Metric) (err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure}) - }() + }(time.Now()) var ( absentMetrics map[model.Fingerprint]model.Metric @@ -435,7 +423,7 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri absentMetrics, err = l.findUnindexedMetrics(fingerprints) if err != nil { - panic(err) + return } if len(absentMetrics) == 0 { @@ -444,11 +432,9 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // TODO: For the missing fingerprints, determine what label names and pairs // are absent and act accordingly and append fingerprints. - var ( - doneBuildingLabelNameIndex = make(chan error) - doneBuildingLabelPairIndex = make(chan error) - doneBuildingFingerprintIndex = make(chan error) - ) + doneBuildingLabelNameIndex := make(chan error) + doneBuildingLabelPairIndex := make(chan error) + doneBuildingFingerprintIndex := make(chan error) go func() { doneBuildingLabelNameIndex <- l.indexLabelNames(absentMetrics) @@ -462,22 +448,17 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri doneBuildingFingerprintIndex <- l.indexFingerprints(absentMetrics) }() - makeTopLevelIndex := true - err = <-doneBuildingLabelNameIndex if err != nil { - panic(err) - makeTopLevelIndex = false + return } err = <-doneBuildingLabelPairIndex if err != nil { - panic(err) - makeTopLevelIndex = false + return } err = <-doneBuildingFingerprintIndex if err != nil { - panic(err) - makeTopLevelIndex = false + return } // If any of the preceding operations failed, we will have inconsistent @@ -485,62 +466,51 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri // its state is used to determine whether to bulk update the other indices. // Given that those operations are idempotent, it is OK to repeat them; // however, it will consume considerable amounts of time. - if makeTopLevelIndex { - batch := leveldb.NewBatch() - defer batch.Close() + batch := leveldb.NewBatch() + defer batch.Close() - // WART: We should probably encode simple fingerprints. - for _, metric := range absentMetrics { - key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) - batch.Put(key, key) - } + // WART: We should probably encode simple fingerprints. + for _, metric := range absentMetrics { + key := coding.NewProtocolBuffer(model.MetricToDTO(metric)) + batch.Put(key, key) + } - err := l.metricMembershipIndex.Commit(batch) - if err != nil { - panic(err) - // Not critical. - log.Println(err) - } + err = l.metricMembershipIndex.Commit(batch) + if err != nil { + return err } return } func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Fingerprint]model.Samples) (err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure}) - }() + }(time.Now()) batch := leveldb.NewBatch() defer batch.Close() - var ( - mutationCount = 0 - ) + mutationCount := 0 for fingerprint, samples := range groups { - var ( - key = &dto.Fingerprint{} - value = &dto.MetricHighWatermark{} - raw []byte - newestSampleTimestamp = samples[len(samples)-1].Timestamp - keyEncoded = coding.NewProtocolBuffer(key) - ) + key := &dto.Fingerprint{} + value := &dto.MetricHighWatermark{} + raw := []byte{} + newestSampleTimestamp := samples[len(samples)-1].Timestamp + keyEncoded := coding.NewProtocolBuffer(key) key.Signature = proto.String(fingerprint.ToRowKey()) raw, err = l.metricHighWatermarks.Get(keyEncoded) if err != nil { - panic(err) return } if raw != nil { err = proto.Unmarshal(raw, value) if err != nil { - panic(err) - continue + return } if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { @@ -554,19 +524,18 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger err = l.metricHighWatermarks.Commit(batch) if err != nil { - panic(err) + return } return } func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) { - begin := time.Now() - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure}) - }() + }(time.Now()) var ( fingerprintToSamples = groupByFingerprint(samples) @@ -630,17 +599,17 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err err = l.metricSamples.Commit(samplesBatch) if err != nil { - panic(err) + return } err = <-indexErrChan if err != nil { - panic(err) + return } err = <-watermarkErrChan if err != nil { - panic(err) + return } return @@ -691,13 +660,11 @@ func fingerprintsEqual(l *dto.Fingerprint, r *dto.Fingerprint) bool { } func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) { - begin := time.Now() - - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) - }() + }(time.Now()) dtoKey := coding.NewProtocolBuffer(dto) value, err = l.metricMembershipIndex.Has(dtoKey) @@ -706,13 +673,11 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, } func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) { - begin := time.Now() - - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) - }() + }(time.Now()) dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelSetToFingerprints.Has(dtoKey) @@ -721,13 +686,11 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, } func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) { - begin := time.Now() - - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) - }() + }(time.Now()) dtoKey := coding.NewProtocolBuffer(dto) value, err = l.labelNameToFingerprints.Has(dtoKey) @@ -736,13 +699,11 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, } func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.LabelSet) (fps model.Fingerprints, err error) { - begin := time.Now() - - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelSet, result: success}, map[string]string{operation: getFingerprintsForLabelSet, result: failure}) - }() + }(time.Now()) sets := []utility.Set{} @@ -786,13 +747,11 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab } func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.LabelName) (fps model.Fingerprints, err error) { - begin := time.Now() - - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) - }() + }(time.Now()) raw, err := l.labelNameToFingerprints.Get(coding.NewProtocolBuffer(model.LabelNameToDTO(&labelName))) if err != nil { @@ -815,13 +774,11 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L } func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) (m *model.Metric, err error) { - begin := time.Now() - - defer func() { + defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) - }() + }(time.Now()) raw, err := l.fingerprintToMetrics.Get(coding.NewProtocolBuffer(model.FingerprintToDTO(f))) if err != nil { diff --git a/storage/metric/processor.go b/storage/metric/processor.go new file mode 100644 index 000000000..5031c2292 --- /dev/null +++ b/storage/metric/processor.go @@ -0,0 +1,233 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "code.google.com/p/goprotobuf/proto" + "fmt" + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/model" + dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw" + "github.com/prometheus/prometheus/storage/raw/leveldb" + "time" +) + +// processor models a post-processing agent that performs work given a sample +// corpus. +type processor interface { + // Name emits the name of this processor's signature encoder. It must be + // fully-qualified in the sense that it could be used via a Protocol Buffer + // registry to extract the descriptor to reassemble this message. + Name() string + // Signature emits a byte signature for this process for the purpose of + // remarking how far along it has been applied to the database. + Signature() (signature []byte, err error) + // Apply runs this processor against the sample set. sampleIterator expects + // to be pre-seeked to the initial starting position. The processor will + // run until up until stopAt has been reached. It is imperative that the + // provided stopAt is within the interval of the series frontier. + // + // Upon completion or error, the last time at which the processor finished + // shall be emitted in addition to any errors. + Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) +} + +// compactionProcessor combines sparse values in the database together such +// that at least MinimumGroupSize is fulfilled from the +type compactionProcessor struct { + // MaximumMutationPoolBatch represents approximately the largest pending + // batch of mutation operations for the database before pausing to + // commit before resumption. + // + // A reasonable value would be (MinimumGroupSize * 2) + 1. + MaximumMutationPoolBatch int + // MinimumGroupSize represents the smallest allowed sample chunk size in the + // database. + MinimumGroupSize int + // signature is the byte representation of the compactionProcessor's settings, + // used for purely memoization purposes across an instance. + signature []byte +} + +func (p compactionProcessor) Name() string { + return "io.prometheus.CompactionProcessorDefinition" +} + +func (p *compactionProcessor) Signature() (out []byte, err error) { + if len(p.signature) == 0 { + out, err = proto.Marshal(&dto.CompactionProcessorDefinition{ + MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)), + }) + + p.signature = out + } + + out = p.signature + + return +} + +func (p compactionProcessor) String() string { + return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize) +} + +func (p compactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { + var pendingBatch raw.Batch = nil + + defer func() { + if pendingBatch != nil { + pendingBatch.Close() + } + }() + + var pendingMutations = 0 + var pendingSamples model.Values + var sampleKey model.SampleKey + var sampleValues model.Values + var lastTouchedTime time.Time + var keyDropped bool + + sampleKey, err = extractSampleKey(sampleIterator) + if err != nil { + return + } + sampleValues, err = extractSampleValues(sampleIterator) + if err != nil { + return + } + + for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) { + switch { + // Furnish a new pending batch operation if none is available. + case pendingBatch == nil: + pendingBatch = leveldb.NewBatch() + + // If there are no sample values to extract from the datastore, let's + // continue extracting more values to use. We know that the time.Before() + // block would prevent us from going into unsafe territory. + case len(sampleValues) == 0: + if !sampleIterator.Next() { + return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") + } + + keyDropped = false + + sampleKey, err = extractSampleKey(sampleIterator) + if err != nil { + return + } + sampleValues, err = extractSampleValues(sampleIterator) + if err != nil { + return + } + + // If the number of pending mutations exceeds the allowed batch amount, + // commit to disk and delete the batch. A new one will be recreated if + // necessary. + case pendingMutations >= p.MaximumMutationPoolBatch: + err = samples.Commit(pendingBatch) + if err != nil { + return + } + + pendingMutations = 0 + + pendingBatch.Close() + pendingBatch = nil + + case len(pendingSamples) == 0 && len(sampleValues) >= p.MinimumGroupSize: + lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp + sampleValues = model.Values{} + + case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize: + if !keyDropped { + key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + pendingBatch.Drop(key) + keyDropped = true + } + pendingSamples = append(pendingSamples, sampleValues...) + lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp + sampleValues = model.Values{} + pendingMutations++ + + // If the number of pending writes equals the target group size + case len(pendingSamples) == p.MinimumGroupSize: + newSampleKey := pendingSamples.ToSampleKey(fingerprint) + key := coding.NewProtocolBuffer(newSampleKey.ToDTO()) + value := coding.NewProtocolBuffer(pendingSamples.ToDTO()) + pendingBatch.Put(key, value) + pendingMutations++ + lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) + if len(sampleValues) > 0 { + if !keyDropped { + key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + pendingBatch.Drop(key) + keyDropped = true + } + + if len(sampleValues) > p.MinimumGroupSize { + pendingSamples = sampleValues[:p.MinimumGroupSize] + sampleValues = sampleValues[p.MinimumGroupSize:] + lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp + } else { + pendingSamples = sampleValues + lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp + sampleValues = model.Values{} + } + } + + case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize: + if !keyDropped { + key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + pendingBatch.Drop(key) + keyDropped = true + } + remainder := p.MinimumGroupSize - len(pendingSamples) + pendingSamples = append(pendingSamples, sampleValues[:remainder]...) + sampleValues = sampleValues[remainder:] + if len(sampleValues) == 0 { + lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp + } else { + lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp + } + pendingMutations++ + + default: + err = fmt.Errorf("Unhandled processing case.") + } + } + + if len(sampleValues) > 0 || len(pendingSamples) > 0 { + pendingSamples = append(sampleValues, pendingSamples...) + newSampleKey := pendingSamples.ToSampleKey(fingerprint) + key := coding.NewProtocolBuffer(newSampleKey.ToDTO()) + value := coding.NewProtocolBuffer(pendingSamples.ToDTO()) + pendingBatch.Put(key, value) + pendingSamples = model.Values{} + pendingMutations++ + lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) + } + + // This is not deferred due to the off-chance that a pre-existing commit + // failed. + if pendingBatch != nil && pendingMutations > 0 { + err = samples.Commit(pendingBatch) + if err != nil { + return + } + } + + return +} diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go new file mode 100644 index 000000000..42050db7f --- /dev/null +++ b/storage/metric/processor_test.go @@ -0,0 +1,962 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "code.google.com/p/goprotobuf/proto" + "fmt" + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/model" + dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage/raw/leveldb" + fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test" + "testing" + "time" +) + +type curationState struct { + fingerprint string + ignoreYoungerThan time.Duration + lastCurated time.Time + processor processor +} + +type watermarkState struct { + fingerprint string + lastAppended time.Time +} + +type sampleGroup struct { + fingerprint string + values model.Values +} + +type in struct { + curationStates fixture.Pairs + watermarkStates fixture.Pairs + sampleGroups fixture.Pairs + ignoreYoungerThan time.Duration + groupSize uint32 + processor processor +} + +type out struct { + curationStates []curationState + sampleGroups []sampleGroup +} + +func (c curationState) Get() (key, value coding.Encoder) { + signature, err := c.processor.Signature() + if err != nil { + panic(err) + } + key = coding.NewProtocolBuffer(model.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint), + ProcessorMessageRaw: signature, + ProcessorMessageTypeName: c.processor.Name(), + IgnoreYoungerThan: c.ignoreYoungerThan, + }.ToDTO()) + + value = coding.NewProtocolBuffer(model.CurationRemark{ + LastCompletionTimestamp: c.lastCurated, + }.ToDTO()) + + return +} + +func (w watermarkState) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBuffer(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) + value = coding.NewProtocolBuffer(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) + return +} + +func (s sampleGroup) Get() (key, value coding.Encoder) { + key = coding.NewProtocolBuffer(model.SampleKey{ + Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint), + FirstTimestamp: s.values[0].Timestamp, + LastTimestamp: s.values[len(s.values)-1].Timestamp, + SampleCount: uint32(len(s.values)), + }.ToDTO()) + + value = coding.NewProtocolBuffer(s.values.ToDTO()) + + return +} + +func TestCuratorCompactionProcessor(t *testing.T) { + scenarios := []struct { + in in + out out + }{ + { + in: in{ + processor: &compactionProcessor{ + MinimumGroupSize: 5, + MaximumMutationPoolBatch: 15, + }, + ignoreYoungerThan: 1 * time.Hour, + groupSize: 5, + curationStates: fixture.Pairs{ + curationState{ + fingerprint: "0001-A-1-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + processor: &compactionProcessor{ + MinimumGroupSize: 5, + MaximumMutationPoolBatch: 15, + }, + }, + curationState{ + fingerprint: "0002-A-2-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + processor: &compactionProcessor{ + MinimumGroupSize: 5, + MaximumMutationPoolBatch: 15, + }, + }, + // This rule should effectively be ignored. + curationState{ + fingerprint: "0002-A-2-Z", + processor: &compactionProcessor{ + MinimumGroupSize: 2, + MaximumMutationPoolBatch: 15, + }, + ignoreYoungerThan: 30 * time.Minute, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + }, + }, + watermarkStates: fixture.Pairs{ + watermarkState{ + fingerprint: "0001-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + watermarkState{ + fingerprint: "0002-A-2-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + }, + sampleGroups: fixture.Pairs{ + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 0, + }, + { + Timestamp: testInstant.Add(-1 * 85 * time.Minute), + Value: 1, + }, + { + Timestamp: testInstant.Add(-1 * 80 * time.Minute), + Value: 2, + }, + { + Timestamp: testInstant.Add(-1 * 75 * time.Minute), + Value: 3, + }, + { + Timestamp: testInstant.Add(-1 * 70 * time.Minute), + Value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 65 * time.Minute), + Value: 0.25, + }, + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 1.25, + }, + { + Timestamp: testInstant.Add(-1 * 55 * time.Minute), + Value: 2.25, + }, + { + Timestamp: testInstant.Add(-1 * 50 * time.Minute), + Value: 3.25, + }, + { + Timestamp: testInstant.Add(-1 * 45 * time.Minute), + Value: 4.25, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 40 * time.Minute), + Value: 0.50, + }, + { + Timestamp: testInstant.Add(-1 * 35 * time.Minute), + Value: 1.50, + }, + { + Timestamp: testInstant.Add(-1 * 30 * time.Minute), + Value: 2.50, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 25 * time.Minute), + Value: 0.75, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 20 * time.Minute), + Value: -2, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 15 * time.Minute), + Value: -3, + }, + }, + }, + sampleGroup{ + // Moved into Block 1 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 0, + }, + }, + }, + sampleGroup{ + // Moved into Block 1 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 89 * time.Minute), + Value: 1, + }, + }, + }, + sampleGroup{ + // Moved into Block 1 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 88 * time.Minute), + Value: 2, + }, + }, + }, + sampleGroup{ + // Moved into Block 1 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 87 * time.Minute), + Value: 3, + }, + }, + }, + sampleGroup{ + // Moved into Block 1 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 86 * time.Minute), + Value: 4, + }, + }, + }, + sampleGroup{ + // Moved into Block 2 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 85 * time.Minute), + Value: 5, + }, + }, + }, + sampleGroup{ + // Moved into Block 2 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 84 * time.Minute), + Value: 6, + }, + }, + }, + sampleGroup{ + // Moved into Block 2 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 83 * time.Minute), + Value: 7, + }, + }, + }, + sampleGroup{ + // Moved into Block 2 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 82 * time.Minute), + Value: 8, + }, + }, + }, + sampleGroup{ + // Moved into Block 2 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 81 * time.Minute), + Value: 9, + }, + }, + }, + sampleGroup{ + // Moved into Block 3 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 80 * time.Minute), + Value: 10, + }, + }, + }, + sampleGroup{ + // Moved into Block 3 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 79 * time.Minute), + Value: 11, + }, + }, + }, + sampleGroup{ + // Moved into Block 3 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 78 * time.Minute), + Value: 12, + }, + }, + }, + sampleGroup{ + // Moved into Block 3 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 77 * time.Minute), + Value: 13, + }, + }, + }, + sampleGroup{ + // Moved into Blocks 3 and 4 and 5 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + // Block 3 + Timestamp: testInstant.Add(-1 * 76 * time.Minute), + Value: 14, + }, + { + // Block 4 + Timestamp: testInstant.Add(-1 * 75 * time.Minute), + Value: 15, + }, + { + // Block 4 + Timestamp: testInstant.Add(-1 * 74 * time.Minute), + Value: 16, + }, + { + // Block 4 + Timestamp: testInstant.Add(-1 * 73 * time.Minute), + Value: 17, + }, + { + // Block 4 + Timestamp: testInstant.Add(-1 * 72 * time.Minute), + Value: 18, + }, + { + // Block 4 + Timestamp: testInstant.Add(-1 * 71 * time.Minute), + Value: 19, + }, + { + // Block 5 + Timestamp: testInstant.Add(-1 * 70 * time.Minute), + Value: 20, + }, + }, + }, + sampleGroup{ + // Moved into Block 5 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 69 * time.Minute), + Value: 21, + }, + }, + }, + sampleGroup{ + // Moved into Block 5 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 68 * time.Minute), + Value: 22, + }, + }, + }, + sampleGroup{ + // Moved into Block 5 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 67 * time.Minute), + Value: 23, + }, + }, + }, + sampleGroup{ + // Moved into Block 5 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 66 * time.Minute), + Value: 24, + }, + }, + }, + sampleGroup{ + // Moved into Block 6 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 65 * time.Minute), + Value: 25, + }, + }, + }, + sampleGroup{ + // Moved into Block 6 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 64 * time.Minute), + Value: 26, + }, + }, + }, + sampleGroup{ + // Moved into Block 6 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 63 * time.Minute), + Value: 27, + }, + }, + }, + sampleGroup{ + // Moved into Block 6 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 62 * time.Minute), + Value: 28, + }, + }, + }, + sampleGroup{ + // Moved into Block 6 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 61 * time.Minute), + Value: 29, + }, + }, + }, + sampleGroup{ + // Moved into Block 7 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 30, + }, + }, + }, + }, + }, + out: out{ + curationStates: []curationState{ + { + fingerprint: "0001-A-1-Z", + ignoreYoungerThan: time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + processor: &compactionProcessor{ + MinimumGroupSize: 5, + MaximumMutationPoolBatch: 15, + }, + }, + { + fingerprint: "0002-A-2-Z", + ignoreYoungerThan: 30 * time.Minute, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + processor: &compactionProcessor{ + MinimumGroupSize: 2, + MaximumMutationPoolBatch: 15, + }, + }, + { + fingerprint: "0002-A-2-Z", + ignoreYoungerThan: time.Hour, + lastCurated: testInstant.Add(-1 * 60 * time.Minute), + processor: &compactionProcessor{ + MinimumGroupSize: 5, + MaximumMutationPoolBatch: 15, + }, + }, + }, + sampleGroups: []sampleGroup{ + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 0, + }, + { + Timestamp: testInstant.Add(-1 * 85 * time.Minute), + Value: 1, + }, + { + Timestamp: testInstant.Add(-1 * 80 * time.Minute), + Value: 2, + }, + { + Timestamp: testInstant.Add(-1 * 75 * time.Minute), + Value: 3, + }, + { + Timestamp: testInstant.Add(-1 * 70 * time.Minute), + Value: 4, + }, + }, + }, + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 65 * time.Minute), + Value: 0.25, + }, + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 1.25, + }, + { + Timestamp: testInstant.Add(-1 * 55 * time.Minute), + Value: 2.25, + }, + { + Timestamp: testInstant.Add(-1 * 50 * time.Minute), + Value: 3.25, + }, + { + Timestamp: testInstant.Add(-1 * 45 * time.Minute), + Value: 4.25, + }, + }, + }, + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 40 * time.Minute), + Value: 0.50, + }, + { + Timestamp: testInstant.Add(-1 * 35 * time.Minute), + Value: 1.50, + }, + { + Timestamp: testInstant.Add(-1 * 30 * time.Minute), + Value: 2.50, + }, + }, + }, + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 25 * time.Minute), + Value: 0.75, + }, + }, + }, + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 20 * time.Minute), + Value: -2, + }, + }, + }, + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 15 * time.Minute), + Value: -3, + }, + }, + }, + { + // Block 1 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 0, + }, + { + Timestamp: testInstant.Add(-1 * 89 * time.Minute), + Value: 1, + }, + { + Timestamp: testInstant.Add(-1 * 88 * time.Minute), + Value: 2, + }, + { + Timestamp: testInstant.Add(-1 * 87 * time.Minute), + Value: 3, + }, + { + Timestamp: testInstant.Add(-1 * 86 * time.Minute), + Value: 4, + }, + }, + }, + { + // Block 2 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 85 * time.Minute), + Value: 5, + }, + { + Timestamp: testInstant.Add(-1 * 84 * time.Minute), + Value: 6, + }, + { + Timestamp: testInstant.Add(-1 * 83 * time.Minute), + Value: 7, + }, + { + Timestamp: testInstant.Add(-1 * 82 * time.Minute), + Value: 8, + }, + { + Timestamp: testInstant.Add(-1 * 81 * time.Minute), + Value: 9, + }, + }, + }, + { + // Block 3 + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 80 * time.Minute), + Value: 10, + }, + { + Timestamp: testInstant.Add(-1 * 79 * time.Minute), + Value: 11, + }, + { + Timestamp: testInstant.Add(-1 * 78 * time.Minute), + Value: 12, + }, + { + Timestamp: testInstant.Add(-1 * 77 * time.Minute), + Value: 13, + }, + { + Timestamp: testInstant.Add(-1 * 76 * time.Minute), + Value: 14, + }, + }, + }, + { + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 75 * time.Minute), + Value: 15, + }, + { + Timestamp: testInstant.Add(-1 * 74 * time.Minute), + Value: 16, + }, + { + Timestamp: testInstant.Add(-1 * 73 * time.Minute), + Value: 17, + }, + { + Timestamp: testInstant.Add(-1 * 72 * time.Minute), + Value: 18, + }, + { + Timestamp: testInstant.Add(-1 * 71 * time.Minute), + Value: 19, + }, + }, + }, + { + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 70 * time.Minute), + Value: 20, + }, + { + Timestamp: testInstant.Add(-1 * 69 * time.Minute), + Value: 21, + }, + { + Timestamp: testInstant.Add(-1 * 68 * time.Minute), + Value: 22, + }, + { + Timestamp: testInstant.Add(-1 * 67 * time.Minute), + Value: 23, + }, + { + Timestamp: testInstant.Add(-1 * 66 * time.Minute), + Value: 24, + }, + }, + }, + { + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 65 * time.Minute), + Value: 25, + }, + { + Timestamp: testInstant.Add(-1 * 64 * time.Minute), + Value: 26, + }, + { + Timestamp: testInstant.Add(-1 * 63 * time.Minute), + Value: 27, + }, + { + Timestamp: testInstant.Add(-1 * 62 * time.Minute), + Value: 28, + }, + { + Timestamp: testInstant.Add(-1 * 61 * time.Minute), + Value: 29, + }, + }, + }, + { + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 30, + }, + }, + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) + defer curatorDirectory.Close() + + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) + defer watermarkDirectory.Close() + + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) + defer sampleDirectory.Close() + + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer curatorStates.Close() + + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer watermarkStates.Close() + + samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer samples.Close() + + c := newCurator() + err = c.run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates) + if err != nil { + t.Fatal(err) + } + + iterator := curatorStates.NewIterator(true) + defer iterator.Close() + + for j, expected := range scenario.out.curationStates { + switch j { + case 0: + if !iterator.SeekToFirst() { + t.Fatalf("%d.%d. could not seek to beginning.", i, j) + } + default: + if !iterator.Next() { + t.Fatalf("%d.%d. could not seek to next.", i, j) + } + } + + curationKeyDto := &dto.CurationKey{} + curationValueDto := &dto.CurationValue{} + + err = proto.Unmarshal(iterator.Key(), curationKeyDto) + if err != nil { + t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) + } + err = proto.Unmarshal(iterator.Value(), curationValueDto) + if err != nil { + t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) + } + + curationKey := model.NewCurationKeyFromDTO(curationKeyDto) + actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto) + + signature, err := expected.processor.Signature() + if err != nil { + t.Fatal(err) + } + + actualKey := curationKey + expectedKey := model.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(expected.fingerprint), + IgnoreYoungerThan: expected.ignoreYoungerThan, + ProcessorMessageRaw: signature, + ProcessorMessageTypeName: expected.processor.Name(), + } + if !actualKey.Equal(expectedKey) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey) + } + expectedCurationRemark := model.CurationRemark{ + LastCompletionTimestamp: expected.lastCurated, + } + if !actualCurationRemark.Equal(expectedCurationRemark) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark) + } + } + + iterator = samples.NewIterator(true) + defer iterator.Close() + + for j, expected := range scenario.out.sampleGroups { + switch j { + case 0: + if !iterator.SeekToFirst() { + t.Fatalf("%d.%d. could not seek to beginning.", i, j) + } + default: + if !iterator.Next() { + t.Fatalf("%d.%d. could not seek to next, expected %s", i, j, expected) + } + } + + sampleKey, err := extractSampleKey(iterator) + if err != nil { + t.Fatalf("%d.%d. error %s", i, j, err) + } + sampleValues, err := extractSampleValues(iterator) + if err != nil { + t.Fatalf("%d.%d. error %s", i, j, err) + } + + if !model.NewFingerprintFromRowKey(expected.fingerprint).Equal(sampleKey.Fingerprint) { + t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, expected.fingerprint, sampleKey.Fingerprint) + } + + if int(sampleKey.SampleCount) != len(expected.values) { + t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), sampleKey.SampleCount) + } + + if len(sampleValues) != len(expected.values) { + t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), len(sampleValues)) + } + + if !sampleKey.FirstTimestamp.Equal(expected.values[0].Timestamp) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.values[0].Timestamp, sampleKey.FirstTimestamp) + } + + for k, actualValue := range sampleValues { + if expected.values[k].Value != actualValue.Value { + t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value) + } + if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) { + t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp) + } + } + + if !sampleKey.LastTimestamp.Equal(expected.values[len(expected.values)-1].Timestamp) { + fmt.Println("last", sampleValues[len(expected.values)-1].Value, expected.values[len(expected.values)-1].Value) + t.Errorf("%d.%d. expected %s, got %s", i, j, expected.values[len(expected.values)-1].Timestamp, sampleKey.LastTimestamp) + } + } + } +} diff --git a/storage/raw/leveldb/batch.go b/storage/raw/leveldb/batch.go index aec4ac547..acec9eaf6 100644 --- a/storage/raw/leveldb/batch.go +++ b/storage/raw/leveldb/batch.go @@ -14,30 +14,34 @@ package leveldb import ( + "fmt" "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" ) type batch struct { batch *levigo.WriteBatch + drops uint32 + puts uint32 } -func NewBatch() batch { - return batch{ +func NewBatch() *batch { + return &batch{ batch: levigo.NewWriteBatch(), } } -func (b batch) Drop(key coding.Encoder) { +func (b *batch) Drop(key coding.Encoder) { keyEncoded, err := key.Encode() if err != nil { panic(err) } + b.drops++ b.batch.Delete(keyEncoded) } -func (b batch) Put(key, value coding.Encoder) { +func (b *batch) Put(key, value coding.Encoder) { keyEncoded, err := key.Encode() if err != nil { panic(err) @@ -46,6 +50,7 @@ func (b batch) Put(key, value coding.Encoder) { if err != nil { panic(err) } + b.puts++ b.batch.Put(keyEncoded, valueEncoded) } @@ -53,3 +58,7 @@ func (b batch) Put(key, value coding.Encoder) { func (b batch) Close() { b.batch.Close() } + +func (b batch) String() string { + return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops) +} diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 901913196..17367345b 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -298,7 +298,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { // tests, we could create a Batch struct that journals pending // operations which the given Persistence implementation could convert // to its specific commit requirements. - batch, ok := b.(batch) + batch, ok := b.(*batch) if !ok { panic("leveldb.batch expected") }