2013-03-21 10:29:33 -07:00
|
|
|
// Copyright 2013 Prometheus Team
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package metric
|
|
|
|
|
|
|
|
import (
|
2013-06-25 05:02:27 -07:00
|
|
|
"bytes"
|
2013-08-29 06:15:22 -07:00
|
|
|
"errors"
|
2013-04-05 09:03:45 -07:00
|
|
|
"fmt"
|
2013-06-25 05:02:27 -07:00
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"code.google.com/p/goprotobuf/proto"
|
2013-08-22 08:40:23 -07:00
|
|
|
"github.com/golang/glog"
|
2013-06-25 05:02:27 -07:00
|
|
|
|
|
|
|
clientmodel "github.com/prometheus/client_golang/model"
|
|
|
|
|
2013-03-21 10:29:33 -07:00
|
|
|
"github.com/prometheus/prometheus/storage"
|
|
|
|
"github.com/prometheus/prometheus/storage/raw"
|
2013-04-05 09:03:45 -07:00
|
|
|
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
2013-08-12 08:18:02 -07:00
|
|
|
|
|
|
|
dto "github.com/prometheus/prometheus/model/generated"
|
2013-03-21 10:29:33 -07:00
|
|
|
)
|
|
|
|
|
2013-08-26 10:12:43 -07:00
|
|
|
const curationYieldPeriod = 250 * time.Millisecond
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
var errIllegalIterator = errors.New("iterator invalid")
|
2013-08-29 06:15:22 -07:00
|
|
|
|
2013-08-13 08:19:13 -07:00
|
|
|
// CurationStateUpdater receives updates about the curation state.
|
|
|
|
type CurationStateUpdater interface {
|
|
|
|
UpdateCurationState(*CurationState)
|
|
|
|
}
|
|
|
|
|
2013-04-28 10:01:56 -07:00
|
|
|
// CurationState contains high-level curation state information for the
|
|
|
|
// heads-up-display.
|
|
|
|
type CurationState struct {
|
|
|
|
Active bool
|
|
|
|
Name string
|
|
|
|
Limit time.Duration
|
2013-06-25 05:02:27 -07:00
|
|
|
Fingerprint *clientmodel.Fingerprint
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
// CuratorOptions bundles the parameters needed to create a Curator.
|
2013-08-29 06:15:22 -07:00
|
|
|
type CuratorOptions struct {
|
2014-04-14 14:34:17 -07:00
|
|
|
Stop chan struct{}
|
2013-08-26 10:12:43 -07:00
|
|
|
|
|
|
|
ViewQueue chan viewJob
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
// Curator is responsible for effectuating a given curation policy across the
|
2013-08-29 06:15:22 -07:00
|
|
|
// stored samples on-disk. This is useful to compact sparse sample values into
|
|
|
|
// single sample entities to reduce keyspace load on the datastore.
|
|
|
|
type Curator struct {
|
2014-04-14 14:34:17 -07:00
|
|
|
stop chan struct{}
|
2013-08-29 06:15:22 -07:00
|
|
|
|
|
|
|
viewQueue chan viewJob
|
|
|
|
|
|
|
|
dtoSampleKeys *dtoSampleKeyList
|
|
|
|
sampleKeys *sampleKeyList
|
|
|
|
}
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
// NewCurator returns an initialized Curator.
|
2013-08-29 06:15:22 -07:00
|
|
|
func NewCurator(o *CuratorOptions) *Curator {
|
|
|
|
return &Curator{
|
|
|
|
stop: o.Stop,
|
|
|
|
|
|
|
|
viewQueue: o.ViewQueue,
|
|
|
|
|
|
|
|
dtoSampleKeys: newDtoSampleKeyList(10),
|
|
|
|
sampleKeys: newSampleKeyList(10),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
// watermarkScanner converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
2013-04-05 09:03:45 -07:00
|
|
|
// into (model.Fingerprint, model.Watermark) doubles.
|
2013-06-25 05:02:27 -07:00
|
|
|
//
|
|
|
|
// watermarkScanner determines whether to include or exclude candidate
|
|
|
|
// values from the curation process by virtue of how old the high watermark is.
|
|
|
|
//
|
|
|
|
// watermarkScanner scans over the curator.samples table for metrics whose
|
2013-04-05 09:03:45 -07:00
|
|
|
// high watermark has been determined to be allowable for curation. This type
|
|
|
|
// is individually responsible for compaction.
|
|
|
|
//
|
|
|
|
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
|
|
|
|
// forward until the stop point or end of the series is reached.
|
2013-06-25 05:02:27 -07:00
|
|
|
type watermarkScanner struct {
|
2013-04-05 09:03:45 -07:00
|
|
|
// curationState is the data store for curation remarks.
|
2013-08-06 03:00:31 -07:00
|
|
|
curationState CurationRemarker
|
2013-04-05 09:03:45 -07:00
|
|
|
// ignoreYoungerThan is passed into the curation remark for the given series.
|
|
|
|
ignoreYoungerThan time.Duration
|
|
|
|
// processor is responsible for executing a given stategy on the
|
|
|
|
// to-be-operated-on series.
|
2013-05-02 03:37:24 -07:00
|
|
|
processor Processor
|
2013-04-05 09:03:45 -07:00
|
|
|
// sampleIterator is a snapshotted iterator for the time series.
|
|
|
|
sampleIterator leveldb.Iterator
|
|
|
|
// samples
|
|
|
|
samples raw.Persistence
|
|
|
|
// stopAt is a cue for when to stop mutating a given series.
|
Use custom timestamp type for sample timestamps and related code.
So far we've been using Go's native time.Time for anything related to sample
timestamps. Since the range of time.Time is much bigger than what we need, this
has created two problems:
- there could be time.Time values which were out of the range/precision of the
time type that we persist to disk, therefore causing incorrectly ordered keys.
One bug caused by this was:
https://github.com/prometheus/prometheus/issues/367
It would be good to use a timestamp type that's more closely aligned with
what the underlying storage supports.
- sizeof(time.Time) is 192, while Prometheus should be ok with a single 64-bit
Unix timestamp (possibly even a 32-bit one). Since we store samples in large
numbers, this seriously affects memory usage. Furthermore, copying/working
with the data will be faster if it's smaller.
*MEMORY USAGE RESULTS*
Initial memory usage comparisons for a running Prometheus with 1 timeseries and
100,000 samples show roughly a 13% decrease in total (VIRT) memory usage. In my
tests, this advantage for some reason decreased a bit the more samples the
timeseries had (to 5-7% for millions of samples). This I can't fully explain,
but perhaps garbage collection issues were involved.
*WHEN TO USE THE NEW TIMESTAMP TYPE*
The new clientmodel.Timestamp type should be used whenever time
calculations are either directly or indirectly related to sample
timestamps.
For example:
- the timestamp of a sample itself
- all kinds of watermarks
- anything that may become or is compared to a sample timestamp (like the timestamp
passed into Target.Scrape()).
When to still use time.Time:
- for measuring durations/times not related to sample timestamps, like duration
telemetry exporting, timers that indicate how frequently to execute some
action, etc.
*NOTE ON OPERATOR OPTIMIZATION TESTS*
We don't use operator optimization code anymore, but it still lives in
the code as dead code. It still has tests, but I couldn't get all of them to
pass with the new timestamp format. I commented out the failing cases for now,
but we should probably remove the dead code soon. I just didn't want to do that
in the same change as this.
Change-Id: I821787414b0debe85c9fffaeb57abd453727af0f
2013-10-28 06:35:02 -07:00
|
|
|
stopAt clientmodel.Timestamp
|
2013-06-25 05:02:27 -07:00
|
|
|
|
|
|
|
// stop functions as the global stop channel for all future operations.
|
2014-04-14 14:34:17 -07:00
|
|
|
stop chan struct{}
|
2013-06-25 05:02:27 -07:00
|
|
|
// status is the outbound channel for notifying the status page of its state.
|
2013-08-13 08:19:13 -07:00
|
|
|
status CurationStateUpdater
|
2013-08-22 08:40:23 -07:00
|
|
|
|
|
|
|
firstBlock, lastBlock *SampleKey
|
2013-08-26 10:12:43 -07:00
|
|
|
|
|
|
|
ViewQueue chan viewJob
|
2013-08-29 06:15:22 -07:00
|
|
|
|
|
|
|
dtoSampleKeys *dtoSampleKeyList
|
|
|
|
sampleKeys *sampleKeyList
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
// Run facilitates the curation lifecycle.
|
2013-04-05 09:03:45 -07:00
|
|
|
//
|
|
|
|
// recencyThreshold represents the most recent time up to which values will be
|
|
|
|
// curated.
|
|
|
|
// curationState is the on-disk store where the curation remarks are made for
|
|
|
|
// how much progress has been made.
|
Use custom timestamp type for sample timestamps and related code.
So far we've been using Go's native time.Time for anything related to sample
timestamps. Since the range of time.Time is much bigger than what we need, this
has created two problems:
- there could be time.Time values which were out of the range/precision of the
time type that we persist to disk, therefore causing incorrectly ordered keys.
One bug caused by this was:
https://github.com/prometheus/prometheus/issues/367
It would be good to use a timestamp type that's more closely aligned with
what the underlying storage supports.
- sizeof(time.Time) is 192, while Prometheus should be ok with a single 64-bit
Unix timestamp (possibly even a 32-bit one). Since we store samples in large
numbers, this seriously affects memory usage. Furthermore, copying/working
with the data will be faster if it's smaller.
*MEMORY USAGE RESULTS*
Initial memory usage comparisons for a running Prometheus with 1 timeseries and
100,000 samples show roughly a 13% decrease in total (VIRT) memory usage. In my
tests, this advantage for some reason decreased a bit the more samples the
timeseries had (to 5-7% for millions of samples). This I can't fully explain,
but perhaps garbage collection issues were involved.
*WHEN TO USE THE NEW TIMESTAMP TYPE*
The new clientmodel.Timestamp type should be used whenever time
calculations are either directly or indirectly related to sample
timestamps.
For example:
- the timestamp of a sample itself
- all kinds of watermarks
- anything that may become or is compared to a sample timestamp (like the timestamp
passed into Target.Scrape()).
When to still use time.Time:
- for measuring durations/times not related to sample timestamps, like duration
telemetry exporting, timers that indicate how frequently to execute some
action, etc.
*NOTE ON OPERATOR OPTIMIZATION TESTS*
We don't use operator optimization code anymore, but it still lives in
the code as dead code. It still has tests, but I couldn't get all of them to
pass with the new timestamp format. I commented out the failing cases for now,
but we should probably remove the dead code soon. I just didn't want to do that
in the same change as this.
Change-Id: I821787414b0debe85c9fffaeb57abd453727af0f
2013-10-28 06:35:02 -07:00
|
|
|
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Timestamp, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status CurationStateUpdater) (err error) {
|
2013-04-05 09:03:45 -07:00
|
|
|
defer func(t time.Time) {
|
2013-05-07 08:14:04 -07:00
|
|
|
duration := float64(time.Since(t) / time.Millisecond)
|
2013-04-05 09:03:45 -07:00
|
|
|
|
|
|
|
labels := map[string]string{
|
|
|
|
cutOff: fmt.Sprint(ignoreYoungerThan),
|
|
|
|
processorName: processor.Name(),
|
|
|
|
result: success,
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
labels[result] = failure
|
|
|
|
}
|
|
|
|
|
|
|
|
curationDuration.IncrementBy(labels, duration)
|
|
|
|
curationDurations.Add(labels, duration)
|
|
|
|
}(time.Now())
|
2013-08-13 08:19:13 -07:00
|
|
|
|
|
|
|
defer status.UpdateCurationState(&CurationState{Active: false})
|
2013-04-05 09:03:45 -07:00
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
iterator, err := samples.NewIterator(true)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2013-04-05 09:03:45 -07:00
|
|
|
defer iterator.Close()
|
|
|
|
|
2013-08-22 08:40:23 -07:00
|
|
|
if !iterator.SeekToLast() {
|
|
|
|
glog.Info("Empty database; skipping curation.")
|
|
|
|
|
2013-04-05 09:03:45 -07:00
|
|
|
return
|
|
|
|
}
|
2013-08-29 06:15:22 -07:00
|
|
|
|
|
|
|
keyDto, _ := c.dtoSampleKeys.Get()
|
|
|
|
defer c.dtoSampleKeys.Give(keyDto)
|
|
|
|
|
|
|
|
lastBlock, _ := c.sampleKeys.Get()
|
|
|
|
defer c.sampleKeys.Give(lastBlock)
|
|
|
|
|
|
|
|
if err := iterator.Key(keyDto); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
lastBlock.Load(keyDto)
|
2013-08-22 08:40:23 -07:00
|
|
|
|
|
|
|
if !iterator.SeekToFirst() {
|
|
|
|
glog.Info("Empty database; skipping curation.")
|
|
|
|
|
2013-04-05 09:03:45 -07:00
|
|
|
return
|
|
|
|
}
|
2013-08-29 06:15:22 -07:00
|
|
|
|
|
|
|
firstBlock, _ := c.sampleKeys.Get()
|
|
|
|
defer c.sampleKeys.Give(firstBlock)
|
|
|
|
|
|
|
|
if err := iterator.Key(keyDto); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
firstBlock.Load(keyDto)
|
2013-04-05 09:03:45 -07:00
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
scanner := &watermarkScanner{
|
2013-04-05 09:03:45 -07:00
|
|
|
curationState: curationState,
|
|
|
|
ignoreYoungerThan: ignoreYoungerThan,
|
2013-04-28 10:01:56 -07:00
|
|
|
processor: processor,
|
|
|
|
status: status,
|
2013-08-29 06:15:22 -07:00
|
|
|
stop: c.stop,
|
2013-04-05 09:03:45 -07:00
|
|
|
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
2013-06-25 05:02:27 -07:00
|
|
|
|
|
|
|
sampleIterator: iterator,
|
|
|
|
samples: samples,
|
2013-08-22 08:40:23 -07:00
|
|
|
|
|
|
|
firstBlock: firstBlock,
|
|
|
|
lastBlock: lastBlock,
|
2013-08-26 10:12:43 -07:00
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
ViewQueue: c.viewQueue,
|
|
|
|
|
|
|
|
dtoSampleKeys: c.dtoSampleKeys,
|
|
|
|
sampleKeys: c.sampleKeys,
|
2013-04-05 04:07:13 -07:00
|
|
|
}
|
2013-04-05 09:03:45 -07:00
|
|
|
|
|
|
|
// Right now, the ability to stop a curation is limited to the beginning of
|
|
|
|
// each fingerprint cycle. It is impractical to cease the work once it has
|
|
|
|
// begun for a given series.
|
2013-06-25 05:02:27 -07:00
|
|
|
_, err = watermarks.ForEach(scanner, scanner, scanner)
|
2013-03-21 10:29:33 -07:00
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
// Close needs to be called to cleanly dispose of a curator.
|
2013-08-29 06:15:22 -07:00
|
|
|
func (c *Curator) Close() {
|
|
|
|
c.dtoSampleKeys.Close()
|
|
|
|
c.sampleKeys.Close()
|
|
|
|
}
|
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) {
|
2014-02-14 10:36:27 -08:00
|
|
|
key := &dto.Fingerprint{}
|
2013-04-05 09:03:45 -07:00
|
|
|
bytes := in.([]byte)
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
if err := proto.Unmarshal(bytes, key); err != nil {
|
|
|
|
return nil, err
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
fingerprint := &clientmodel.Fingerprint{}
|
2013-06-25 05:02:27 -07:00
|
|
|
loadFingerprint(fingerprint, key)
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
return fingerprint, nil
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) {
|
2014-02-14 10:36:27 -08:00
|
|
|
value := &dto.MetricHighWatermark{}
|
2013-04-05 09:03:45 -07:00
|
|
|
bytes := in.([]byte)
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
if err := proto.Unmarshal(bytes, value); err != nil {
|
|
|
|
return nil, err
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
watermark := &watermarks{}
|
2013-06-25 05:02:27 -07:00
|
|
|
watermark.load(value)
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
return watermark, nil
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
func (w *watermarkScanner) shouldStop() bool {
|
2014-04-14 14:34:17 -07:00
|
|
|
select {
|
|
|
|
case _, ok := <-w.stop:
|
|
|
|
if ok {
|
|
|
|
panic("channel should be closed only")
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) {
|
|
|
|
fingerprint := key.(*clientmodel.Fingerprint)
|
2013-04-28 10:01:56 -07:00
|
|
|
|
2013-04-05 09:03:45 -07:00
|
|
|
defer func() {
|
|
|
|
labels := map[string]string{
|
|
|
|
cutOff: fmt.Sprint(w.ignoreYoungerThan),
|
|
|
|
result: strings.ToLower(r.String()),
|
|
|
|
processorName: w.processor.Name(),
|
|
|
|
}
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-04-05 09:03:45 -07:00
|
|
|
curationFilterOperations.Increment(labels)
|
|
|
|
|
2013-08-13 08:19:13 -07:00
|
|
|
w.status.UpdateCurationState(&CurationState{
|
2013-04-28 10:01:56 -07:00
|
|
|
Active: true,
|
|
|
|
Name: w.processor.Name(),
|
|
|
|
Limit: w.ignoreYoungerThan,
|
|
|
|
Fingerprint: fingerprint,
|
2013-08-13 08:19:13 -07:00
|
|
|
})
|
2013-04-28 10:01:56 -07:00
|
|
|
}()
|
|
|
|
|
2013-04-05 09:03:45 -07:00
|
|
|
if w.shouldStop() {
|
2014-02-14 10:36:27 -08:00
|
|
|
return storage.Stop
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
k := &curationKey{
|
|
|
|
Fingerprint: fingerprint,
|
|
|
|
ProcessorMessageRaw: w.processor.Signature(),
|
|
|
|
ProcessorMessageTypeName: w.processor.Name(),
|
|
|
|
IgnoreYoungerThan: w.ignoreYoungerThan,
|
|
|
|
}
|
|
|
|
|
2013-08-06 03:00:31 -07:00
|
|
|
curationRemark, present, err := w.curationState.Get(k)
|
2013-04-05 09:03:45 -07:00
|
|
|
if err != nil {
|
2013-03-21 10:29:33 -07:00
|
|
|
return
|
|
|
|
}
|
2013-06-25 05:02:27 -07:00
|
|
|
if !present {
|
2014-02-14 10:36:27 -08:00
|
|
|
return storage.Accept
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
2013-08-06 03:00:31 -07:00
|
|
|
if !curationRemark.Before(w.stopAt) {
|
2014-02-14 10:36:27 -08:00
|
|
|
return storage.Skip
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
2013-06-25 05:02:27 -07:00
|
|
|
watermark := value.(*watermarks)
|
2013-08-06 03:00:31 -07:00
|
|
|
if !curationRemark.Before(watermark.High) {
|
2014-02-14 10:36:27 -08:00
|
|
|
return storage.Skip
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
|
|
|
curationConsistent, err := w.curationConsistent(fingerprint, watermark)
|
|
|
|
if err != nil {
|
2013-03-21 10:29:33 -07:00
|
|
|
return
|
|
|
|
}
|
2013-04-05 09:03:45 -07:00
|
|
|
if curationConsistent {
|
2014-02-14 10:36:27 -08:00
|
|
|
return storage.Skip
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2014-02-14 10:36:27 -08:00
|
|
|
return storage.Accept
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// curationConsistent determines whether the given metric is in a dirty state
|
|
|
|
// and needs curation.
|
2013-06-25 05:02:27 -07:00
|
|
|
func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, watermark *watermarks) (bool, error) {
|
|
|
|
k := &curationKey{
|
|
|
|
Fingerprint: f,
|
|
|
|
ProcessorMessageRaw: w.processor.Signature(),
|
|
|
|
ProcessorMessageTypeName: w.processor.Name(),
|
|
|
|
IgnoreYoungerThan: w.ignoreYoungerThan,
|
|
|
|
}
|
2013-08-06 03:00:31 -07:00
|
|
|
curationRemark, present, err := w.curationState.Get(k)
|
2013-04-05 09:03:45 -07:00
|
|
|
if err != nil {
|
2013-06-25 05:02:27 -07:00
|
|
|
return false, err
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
2013-06-25 05:02:27 -07:00
|
|
|
if !present {
|
|
|
|
return false, nil
|
|
|
|
}
|
2013-08-06 03:00:31 -07:00
|
|
|
if !curationRemark.Before(watermark.High) {
|
2013-06-25 05:02:27 -07:00
|
|
|
return true, nil
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
return false, nil
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-06-25 05:02:27 -07:00
|
|
|
func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
|
2013-08-29 06:15:22 -07:00
|
|
|
fingerprint := key.(*clientmodel.Fingerprint)
|
|
|
|
|
|
|
|
glog.Infof("Curating %s...", fingerprint)
|
|
|
|
|
2013-08-26 10:12:43 -07:00
|
|
|
if len(w.ViewQueue) > 0 {
|
2013-08-29 06:15:22 -07:00
|
|
|
glog.Warning("Deferred due to view queue.")
|
2013-08-26 10:12:43 -07:00
|
|
|
time.Sleep(curationYieldPeriod)
|
|
|
|
}
|
|
|
|
|
2013-08-22 08:40:23 -07:00
|
|
|
if fingerprint.Less(w.firstBlock.Fingerprint) {
|
2013-08-29 06:15:22 -07:00
|
|
|
glog.Warning("Skipped since before keyspace.")
|
2013-08-22 08:40:23 -07:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if w.lastBlock.Fingerprint.Less(fingerprint) {
|
2013-08-29 06:15:22 -07:00
|
|
|
glog.Warning("Skipped since after keyspace.")
|
2013-08-22 08:40:23 -07:00
|
|
|
return nil
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
curationState, _, err := w.curationState.Get(&curationKey{
|
2013-06-25 05:02:27 -07:00
|
|
|
Fingerprint: fingerprint,
|
|
|
|
ProcessorMessageRaw: w.processor.Signature(),
|
|
|
|
ProcessorMessageTypeName: w.processor.Name(),
|
|
|
|
IgnoreYoungerThan: w.ignoreYoungerThan,
|
2013-08-06 03:00:31 -07:00
|
|
|
})
|
2013-04-05 09:03:45 -07:00
|
|
|
if err != nil {
|
2013-08-29 06:15:22 -07:00
|
|
|
glog.Warning("Unable to get curation state: %s", err)
|
2013-04-05 09:03:45 -07:00
|
|
|
// An anomaly with the curation remark is likely not fatal in the sense that
|
|
|
|
// there was a decoding error with the entity and shouldn't be cause to stop
|
|
|
|
// work. The process will simply start from a pessimistic work time and
|
|
|
|
// work forward. With an idempotent processor, this is safe.
|
2014-01-21 07:49:51 -08:00
|
|
|
return &storage.OperatorError{Error: err, Continuable: true}
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
2013-08-22 08:40:23 -07:00
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
keySet, _ := w.sampleKeys.Get()
|
|
|
|
defer w.sampleKeys.Give(keySet)
|
|
|
|
|
|
|
|
keySet.Fingerprint = fingerprint
|
|
|
|
keySet.FirstTimestamp = curationState
|
|
|
|
|
|
|
|
// Invariant: The fingerprint tests above ensure that we have the same
|
|
|
|
// fingerprint.
|
|
|
|
keySet.Constrain(w.firstBlock, w.lastBlock)
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
seeker := &iteratorSeekerState{
|
|
|
|
i: w.sampleIterator,
|
|
|
|
|
|
|
|
obj: keySet,
|
|
|
|
|
|
|
|
first: w.firstBlock,
|
|
|
|
last: w.lastBlock,
|
|
|
|
|
|
|
|
dtoSampleKeys: w.dtoSampleKeys,
|
|
|
|
sampleKeys: w.sampleKeys,
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
for state := seeker.initialize; state != nil; state = state() {
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
if seeker.err != nil {
|
|
|
|
glog.Warningf("Got error in state machine: %s", seeker.err)
|
2013-08-22 08:40:23 -07:00
|
|
|
|
2014-01-21 07:49:51 -08:00
|
|
|
return &storage.OperatorError{Error: seeker.err, Continuable: !seeker.iteratorInvalid}
|
2013-08-29 06:15:22 -07:00
|
|
|
}
|
2013-08-22 08:40:23 -07:00
|
|
|
|
2013-08-29 06:15:22 -07:00
|
|
|
if seeker.iteratorInvalid {
|
|
|
|
glog.Warningf("Got illegal iterator in state machine: %s", err)
|
2013-08-22 08:40:23 -07:00
|
|
|
|
2014-01-21 07:49:51 -08:00
|
|
|
return &storage.OperatorError{Error: errIllegalIterator, Continuable: false}
|
2013-08-29 06:15:22 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
if !seeker.seriesOperable {
|
|
|
|
return
|
2013-04-05 09:03:45 -07:00
|
|
|
}
|
2013-03-21 10:29:33 -07:00
|
|
|
|
2013-08-22 08:40:23 -07:00
|
|
|
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint)
|
2013-03-21 10:29:33 -07:00
|
|
|
if err != nil {
|
2013-04-05 09:03:45 -07:00
|
|
|
// We can't divine the severity of a processor error without refactoring the
|
|
|
|
// interface.
|
2014-01-21 07:49:51 -08:00
|
|
|
return &storage.OperatorError{Error: err, Continuable: false}
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-08-22 08:40:23 -07:00
|
|
|
if err = w.curationState.Update(&curationKey{
|
2013-08-06 03:00:31 -07:00
|
|
|
Fingerprint: fingerprint,
|
|
|
|
ProcessorMessageRaw: w.processor.Signature(),
|
|
|
|
ProcessorMessageTypeName: w.processor.Name(),
|
|
|
|
IgnoreYoungerThan: w.ignoreYoungerThan,
|
2013-08-22 08:40:23 -07:00
|
|
|
}, lastTime); err != nil {
|
2013-04-05 09:03:45 -07:00
|
|
|
// Under the assumption that the processors are idempotent, they can be
|
|
|
|
// re-run; thusly, the commitment of the curation remark is no cause
|
|
|
|
// to cease further progress.
|
2014-01-21 07:49:51 -08:00
|
|
|
return &storage.OperatorError{Error: err, Continuable: true}
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|
|
|
|
|
2013-08-06 03:00:31 -07:00
|
|
|
return nil
|
2013-06-25 05:02:27 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// curationKey provides a representation of dto.CurationKey with associated
|
|
|
|
// business logic methods attached to it to enhance code readability.
|
|
|
|
type curationKey struct {
|
|
|
|
Fingerprint *clientmodel.Fingerprint
|
|
|
|
ProcessorMessageRaw []byte
|
|
|
|
ProcessorMessageTypeName string
|
|
|
|
IgnoreYoungerThan time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
// Equal answers whether the two curationKeys are equivalent.
|
|
|
|
func (c *curationKey) Equal(o *curationKey) bool {
|
|
|
|
switch {
|
|
|
|
case !c.Fingerprint.Equal(o.Fingerprint):
|
|
|
|
return false
|
|
|
|
case bytes.Compare(c.ProcessorMessageRaw, o.ProcessorMessageRaw) != 0:
|
|
|
|
return false
|
|
|
|
case c.ProcessorMessageTypeName != o.ProcessorMessageTypeName:
|
|
|
|
return false
|
|
|
|
case c.IgnoreYoungerThan != o.IgnoreYoungerThan:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *curationKey) dump(d *dto.CurationKey) {
|
|
|
|
d.Reset()
|
|
|
|
|
|
|
|
// BUG(matt): Avenue for simplification.
|
|
|
|
fingerprintDTO := &dto.Fingerprint{}
|
|
|
|
|
|
|
|
dumpFingerprint(fingerprintDTO, c.Fingerprint)
|
|
|
|
|
|
|
|
d.Fingerprint = fingerprintDTO
|
|
|
|
d.ProcessorMessageRaw = c.ProcessorMessageRaw
|
|
|
|
d.ProcessorMessageTypeName = proto.String(c.ProcessorMessageTypeName)
|
|
|
|
d.IgnoreYoungerThan = proto.Int64(int64(c.IgnoreYoungerThan))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *curationKey) load(d *dto.CurationKey) {
|
|
|
|
// BUG(matt): Avenue for simplification.
|
|
|
|
c.Fingerprint = &clientmodel.Fingerprint{}
|
|
|
|
|
|
|
|
loadFingerprint(c.Fingerprint, d.Fingerprint)
|
|
|
|
|
|
|
|
c.ProcessorMessageRaw = d.ProcessorMessageRaw
|
|
|
|
c.ProcessorMessageTypeName = d.GetProcessorMessageTypeName()
|
|
|
|
c.IgnoreYoungerThan = time.Duration(d.GetIgnoreYoungerThan())
|
2013-03-21 10:29:33 -07:00
|
|
|
}
|