Merge pull request #348 from prometheus/refactor/storage/componentize

Replace direct curation table access with wrapper.
This commit is contained in:
Matt T. Proud 2013-08-06 03:49:10 -07:00
commit e373d4c949
10 changed files with 209 additions and 231 deletions

View file

@ -64,7 +64,7 @@ type Curator struct {
// forward until the stop point or end of the series is reached. // forward until the stop point or end of the series is reached.
type watermarkScanner struct { type watermarkScanner struct {
// curationState is the data store for curation remarks. // curationState is the data store for curation remarks.
curationState raw.Persistence curationState CurationRemarker
// diskFrontier models the available seekable ranges for the provided // diskFrontier models the available seekable ranges for the provided
// sampleIterator. // sampleIterator.
diskFrontier *diskFrontier diskFrontier *diskFrontier
@ -92,7 +92,7 @@ type watermarkScanner struct {
// curated. // curated.
// curationState is the on-disk store where the curation remarks are made for // curationState is the on-disk store where the curation remarks are made for
// how much progress has been made. // how much progress has been made.
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) { func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) {
defer func(t time.Time) { defer func(t time.Time) {
duration := float64(time.Since(t) / time.Millisecond) duration := float64(time.Since(t) / time.Millisecond)
@ -189,26 +189,6 @@ func (w *watermarkScanner) shouldStop() bool {
return len(w.stop) != 0 return len(w.stop) != 0
} }
func (w *watermarkScanner) getCurationRemark(k *curationKey) (r *curationRemark, found bool, err error) {
curationKey := new(dto.CurationKey)
curationValue := new(dto.CurationValue)
k.dump(curationKey)
present, err := w.curationState.Get(curationKey, curationValue)
if err != nil {
return nil, false, err
}
if !present {
return nil, false, nil
}
remark := new(curationRemark)
remark.load(curationValue)
return remark, true, nil
}
func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) { func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) {
fingerprint := key.(*clientmodel.Fingerprint) fingerprint := key.(*clientmodel.Fingerprint)
@ -244,18 +224,18 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
IgnoreYoungerThan: w.ignoreYoungerThan, IgnoreYoungerThan: w.ignoreYoungerThan,
} }
curationRemark, present, err := w.getCurationRemark(k) curationRemark, present, err := w.curationState.Get(k)
if err != nil { if err != nil {
return return
} }
if !present { if !present {
return storage.ACCEPT return storage.ACCEPT
} }
if !curationRemark.OlderThan(w.stopAt) { if !curationRemark.Before(w.stopAt) {
return storage.SKIP return storage.SKIP
} }
watermark := value.(*watermarks) watermark := value.(*watermarks)
if !curationRemark.OlderThan(watermark.High) { if !curationRemark.Before(watermark.High) {
return storage.SKIP return storage.SKIP
} }
curationConsistent, err := w.curationConsistent(fingerprint, watermark) curationConsistent, err := w.curationConsistent(fingerprint, watermark)
@ -278,14 +258,14 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
ProcessorMessageTypeName: w.processor.Name(), ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan, IgnoreYoungerThan: w.ignoreYoungerThan,
} }
curationRemark, present, err := w.getCurationRemark(k) curationRemark, present, err := w.curationState.Get(k)
if err != nil { if err != nil {
return false, err return false, err
} }
if !present { if !present {
return false, nil return false, nil
} }
if !curationRemark.OlderThan(watermark.High) { if !curationRemark.Before(watermark.High) {
return true, nil return true, nil
} }
@ -303,14 +283,13 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: err, Continuable: false} return &storage.OperatorError{error: err, Continuable: false}
} }
k := &curationKey{ curationState, present, err := w.curationState.Get(&curationKey{
Fingerprint: fingerprint, Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(), ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(), ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan, IgnoreYoungerThan: w.ignoreYoungerThan,
} })
curationState, _, err := w.getCurationRemark(k)
if err != nil { if err != nil {
// An anomaly with the curation remark is likely not fatal in the sense that // An anomaly with the curation remark is likely not fatal in the sense that
// there was a decoding error with the entity and shouldn't be cause to stop // there was a decoding error with the entity and shouldn't be cause to stop
@ -318,10 +297,19 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
// work forward. With an idempotent processor, this is safe. // work forward. With an idempotent processor, this is safe.
return &storage.OperatorError{error: err, Continuable: true} return &storage.OperatorError{error: err, Continuable: true}
} }
var firstSeek time.Time
switch {
case !present, seriesFrontier.After(curationState):
firstSeek = seriesFrontier.firstSupertime
case !seriesFrontier.InSafeSeekRange(curationState):
firstSeek = seriesFrontier.lastSupertime
default:
firstSeek = curationState
}
startKey := &SampleKey{ startKey := &SampleKey{
Fingerprint: fingerprint, Fingerprint: fingerprint,
FirstTimestamp: seriesFrontier.optimalStartTime(curationState), FirstTimestamp: firstSeek,
} }
dto := new(dto.SampleKey) dto := new(dto.SampleKey)
@ -345,7 +333,12 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: err, Continuable: false} return &storage.OperatorError{error: err, Continuable: false}
} }
err = w.refreshCurationRemark(fingerprint, lastTime) err = w.curationState.Update(&curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}, lastTime)
if err != nil { if err != nil {
// Under the assumption that the processors are idempotent, they can be // Under the assumption that the processors are idempotent, they can be
// re-run; thusly, the commitment of the curation remark is no cause // re-run; thusly, the commitment of the curation remark is no cause
@ -353,56 +346,7 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: err, Continuable: true} return &storage.OperatorError{error: err, Continuable: true}
} }
return return nil
}
func (w *watermarkScanner) refreshCurationRemark(f *clientmodel.Fingerprint, finished time.Time) error {
curationKey := curationKey{
Fingerprint: f,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}
k := new(dto.CurationKey)
curationKey.dump(k)
curationValue := curationRemark{
LastCompletionTimestamp: finished,
}
v := new(dto.CurationValue)
curationValue.dump(v)
return w.curationState.Put(k, v)
}
// curationRemark provides a representation of dto.CurationValue with associated
// business logic methods attached to it to enhance code readability.
type curationRemark struct {
LastCompletionTimestamp time.Time
}
// OlderThan answers whether this curationRemark is older than the provided
// cutOff time.
func (c *curationRemark) OlderThan(t time.Time) bool {
return c.LastCompletionTimestamp.Before(t)
}
// Equal answers whether the two curationRemarks are equivalent.
func (c *curationRemark) Equal(o curationRemark) bool {
return c.LastCompletionTimestamp.Equal(o.LastCompletionTimestamp)
}
func (c *curationRemark) String() string {
return fmt.Sprintf("Last curated at %s", c.LastCompletionTimestamp)
}
func (c *curationRemark) load(d *dto.CurationValue) {
c.LastCompletionTimestamp = time.Unix(d.GetLastCompletionTimestamp(), 0).UTC()
}
func (c *curationRemark) dump(d *dto.CurationValue) {
d.Reset()
d.LastCompletionTimestamp = proto.Int64(c.LastCompletionTimestamp.Unix())
} }
// curationKey provides a representation of dto.CurationKey with associated // curationKey provides a representation of dto.CurationKey with associated

View file

@ -194,19 +194,3 @@ func (s *seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) {
func (s *seriesFrontier) After(t time.Time) bool { func (s *seriesFrontier) After(t time.Time) bool {
return s.firstSupertime.After(t) return s.firstSupertime.After(t)
} }
// optimalStartTime indicates what the best start time for a curation operation
// should be given the curation remark.
func (s *seriesFrontier) optimalStartTime(remark *curationRemark) (t time.Time) {
switch {
case remark == nil:
t = s.firstSupertime
case s.After(remark.LastCompletionTimestamp):
t = s.firstSupertime
case !s.InSafeSeekRange(remark.LastCompletionTimestamp):
t = s.lastSupertime
default:
t = remark.LastCompletionTimestamp
}
return
}

View file

@ -14,7 +14,6 @@
package metric package metric
import ( import (
"io"
"sort" "sort"
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
@ -31,7 +30,6 @@ import (
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
type FingerprintMetricIndex interface { type FingerprintMetricIndex interface {
io.Closer
raw.Pruner raw.Pruner
IndexBatch(FingerprintMetricMapping) error IndexBatch(FingerprintMetricMapping) error
@ -40,7 +38,7 @@ type FingerprintMetricIndex interface {
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type LeveldbFingerprintMetricIndex struct { type LevelDBFingerprintMetricIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
@ -48,22 +46,20 @@ type LevelDBFingerprintMetricIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func (i *LeveldbFingerprintMetricIndex) Close() error { func (i *LevelDBFingerprintMetricIndex) Close() {
i.p.Close() i.p.Close()
return nil
} }
func (i *LeveldbFingerprintMetricIndex) State() *raw.DatabaseState { func (i *LevelDBFingerprintMetricIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
func (i *LeveldbFingerprintMetricIndex) Size() (uint64, bool, error) { func (i *LevelDBFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.Size()
return s, true, err return s, true, err
} }
func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { func (i *LevelDBFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch() b := leveldb.NewBatch()
defer b.Close() defer b.Close()
@ -79,7 +75,7 @@ func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapp
return i.p.Commit(b) return i.p.Commit(b)
} }
func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
k := new(dto.Fingerprint) k := new(dto.Fingerprint)
dumpFingerprint(k, f) dumpFingerprint(k, f)
v := new(dto.Metric) v := new(dto.Metric)
@ -98,19 +94,19 @@ func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m cl
return m, true, nil return m, true, nil
} }
func (i *LeveldbFingerprintMetricIndex) Prune() (bool, error) { func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) {
i.p.Prune() i.p.Prune()
return false, nil return false, nil
} }
func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) { func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &LeveldbFingerprintMetricIndex{ return &LevelDBFingerprintMetricIndex{
p: s, p: s,
}, nil }, nil
} }
@ -118,7 +114,6 @@ func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (
type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints
type LabelNameFingerprintIndex interface { type LabelNameFingerprintIndex interface {
io.Closer
raw.Pruner raw.Pruner
IndexBatch(LabelNameFingerprintMapping) error IndexBatch(LabelNameFingerprintMapping) error
@ -128,11 +123,11 @@ type LabelNameFingerprintIndex interface {
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type LeveldbLabelNameFingerprintIndex struct { type LevelDBLabelNameFingerprintIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error { func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -155,7 +150,7 @@ func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp
return i.p.Commit(batch) return i.p.Commit(batch)
} }
func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) { func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
k := new(dto.LabelName) k := new(dto.LabelName)
dumpLabelName(k, l) dumpLabelName(k, l)
v := new(dto.FingerprintCollection) v := new(dto.FingerprintCollection)
@ -176,30 +171,28 @@ func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps
return fps, true, nil return fps, true, nil
} }
func (i *LeveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) { func (i *LevelDBLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.p.Has(&dto.LabelName{ return i.p.Has(&dto.LabelName{
Name: proto.String(string(l)), Name: proto.String(string(l)),
}) })
} }
func (i *LeveldbLabelNameFingerprintIndex) Prune() (bool, error) { func (i *LevelDBLabelNameFingerprintIndex) Prune() (bool, error) {
i.p.Prune() i.p.Prune()
return false, nil return false, nil
} }
func (i *LeveldbLabelNameFingerprintIndex) Close() error { func (i *LevelDBLabelNameFingerprintIndex) Close() {
i.p.Close() i.p.Close()
return nil
} }
func (i *LeveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) { func (i *LevelDBLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.Size()
return s, true, err return s, true, err
} }
func (i *LeveldbLabelNameFingerprintIndex) State() *raw.DatabaseState { func (i *LevelDBLabelNameFingerprintIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
@ -207,13 +200,13 @@ type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (LabelNameFingerprintIndex, error) { func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &LeveldbLabelNameFingerprintIndex{ return &LevelDBLabelNameFingerprintIndex{
p: s, p: s,
}, nil }, nil
} }
@ -221,7 +214,6 @@ func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOption
type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelSetFingerprintIndex interface { type LabelSetFingerprintIndex interface {
io.Closer
raw.ForEacher raw.ForEacher
raw.Pruner raw.Pruner
@ -232,7 +224,7 @@ type LabelSetFingerprintIndex interface {
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type LeveldbLabelSetFingerprintIndex struct { type LevelDBLabelSetFingerprintIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
@ -240,7 +232,7 @@ type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error { func (i *LevelDBLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -264,7 +256,7 @@ func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMappin
return i.p.Commit(batch) return i.p.Commit(batch)
} }
func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) { func (i *LevelDBLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{ k := &dto.LabelPair{
Name: proto.String(string(p.Name)), Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)), Value: proto.String(string(p.Value)),
@ -289,7 +281,7 @@ func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fi
return m, true, nil return m, true, nil
} }
func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) { func (i *LevelDBLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{ k := &dto.LabelPair{
Name: proto.String(string(p.Name)), Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)), Value: proto.String(string(p.Value)),
@ -298,43 +290,40 @@ func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error)
return i.p.Has(k) return i.p.Has(k)
} }
func (i *LeveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { func (i *LevelDBLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o) return i.p.ForEach(d, f, o)
} }
func (i *LeveldbLabelSetFingerprintIndex) Prune() (bool, error) { func (i *LevelDBLabelSetFingerprintIndex) Prune() (bool, error) {
i.p.Prune() i.p.Prune()
return false, nil return false, nil
} }
func (i *LeveldbLabelSetFingerprintIndex) Close() error { func (i *LevelDBLabelSetFingerprintIndex) Close() {
i.p.Close() i.p.Close()
return nil
} }
func (i *LeveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) { func (i *LevelDBLabelSetFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.Size()
return s, true, err return s, true, err
} }
func (i *LeveldbLabelSetFingerprintIndex) State() *raw.DatabaseState { func (i *LevelDBLabelSetFingerprintIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (LabelSetFingerprintIndex, error) { func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelSetFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &LeveldbLabelSetFingerprintIndex{ return &LevelDBLabelSetFingerprintIndex{
p: s, p: s,
}, nil }, nil
} }
type MetricMembershipIndex interface { type MetricMembershipIndex interface {
io.Closer
raw.Pruner raw.Pruner
IndexBatch([]clientmodel.Metric) error IndexBatch([]clientmodel.Metric) error
@ -343,13 +332,13 @@ type MetricMembershipIndex interface {
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type LeveldbMetricMembershipIndex struct { type LevelDBMetricMembershipIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
var existenceIdentity = new(dto.MembershipIndexValue) var existenceIdentity = new(dto.MembershipIndexValue)
func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error { func (i *LevelDBMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -362,29 +351,27 @@ func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error
return i.p.Commit(batch) return i.p.Commit(batch)
} }
func (i *LeveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) { func (i *LevelDBMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
k := new(dto.Metric) k := new(dto.Metric)
dumpMetric(k, m) dumpMetric(k, m)
return i.p.Has(k) return i.p.Has(k)
} }
func (i *LeveldbMetricMembershipIndex) Close() error { func (i *LevelDBMetricMembershipIndex) Close() {
i.p.Close() i.p.Close()
return nil
} }
func (i *LeveldbMetricMembershipIndex) Size() (uint64, bool, error) { func (i *LevelDBMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.Size()
return s, true, err return s, true, err
} }
func (i *LeveldbMetricMembershipIndex) State() *raw.DatabaseState { func (i *LevelDBMetricMembershipIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
func (i *LeveldbMetricMembershipIndex) Prune() (bool, error) { func (i *LevelDBMetricMembershipIndex) Prune() (bool, error) {
i.p.Prune() i.p.Prune()
return false, nil return false, nil
@ -394,13 +381,13 @@ type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (MetricMembershipIndex, error) { func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &LeveldbMetricMembershipIndex{ return &LevelDBMetricMembershipIndex{
p: s, p: s,
}, nil }, nil
} }

View file

@ -36,7 +36,7 @@ import (
const sortConcurrency = 2 const sortConcurrency = 2
type LevelDBMetricPersistence struct { type LevelDBMetricPersistence struct {
CurationRemarks *leveldb.LevelDBPersistence CurationRemarks CurationRemarker
fingerprintToMetrics FingerprintMetricIndex fingerprintToMetrics FingerprintMetricIndex
labelNameToFingerprints LabelNameFingerprintIndex labelNameToFingerprints LabelNameFingerprintIndex
labelSetToFingerprints LabelSetFingerprintIndex labelSetToFingerprints LabelSetFingerprintIndex
@ -202,13 +202,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Sample Curation Remarks", "Sample Curation Remarks",
func() { func() {
var err error var err error
o := &leveldb.LevelDBOptions{ emission.CurationRemarks, err = NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{
Name: "Sample Curation Remarks", LevelDBOptions: leveldb.LevelDBOptions{
Purpose: "Ledger of Progress for Various Curators", Name: "Sample Curation Remarks",
Path: baseDirectory + "/curation_remarks", Purpose: "Ledger of Progress for Various Curators",
CacheSizeBytes: *curationRemarksCacheSize, Path: baseDirectory + "/curation_remarks",
} CacheSizeBytes: *curationRemarksCacheSize,
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(o) },
})
workers.MayFail(err) workers.MayFail(err)
}, },
}, },
@ -764,7 +765,7 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La
return return
} }
// CompactKeyspace compacts each database's keyspace serially. // Prune compacts each database's keyspace serially.
// //
// Beware that it would probably be imprudent to run this on a live user-facing // Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications. // server due to latency implications.
@ -778,10 +779,10 @@ func (l *LevelDBMetricPersistence) Prune() {
l.MetricSamples.Prune() l.MetricSamples.Prune()
} }
func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) { func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
size := uint64(0) size := uint64(0)
if size, err = l.CurationRemarks.ApproximateSize(); err != nil { if size, _, err = l.CurationRemarks.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size
@ -811,7 +812,7 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
} }
total += size total += size
if size, err = l.MetricSamples.ApproximateSize(); err != nil { if size, err = l.MetricSamples.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size

View file

@ -72,13 +72,10 @@ func (c curationState) Get() (key, value proto.Message) {
k := &dto.CurationKey{} k := &dto.CurationKey{}
keyRaw.dump(k) keyRaw.dump(k)
key = k
valueRaw := curationRemark{ v := &dto.CurationValue{
LastCompletionTimestamp: c.lastCurated, LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
} }
v := &dto.CurationValue{}
valueRaw.dump(v)
return k, v return k, v
} }
@ -848,8 +845,10 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close() defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{
Path: curatorDirectory.Path(), LevelDBOptions: leveldb.LevelDBOptions{
Path: curatorDirectory.Path(),
},
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -888,7 +887,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
iterator := curatorStates.NewIterator(true) iterator := curatorStates.p.NewIterator(true)
defer iterator.Close() defer iterator.Close()
for j, expected := range scenario.out.curationStates { for j, expected := range scenario.out.curationStates {
@ -904,38 +903,35 @@ func TestCuratorCompactionProcessor(t *testing.T) {
} }
curationKeyDto := &dto.CurationKey{} curationKeyDto := &dto.CurationKey{}
curationValueDto := &dto.CurationValue{}
err = proto.Unmarshal(iterator.Key(), curationKeyDto) err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil { if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
} }
err = proto.Unmarshal(iterator.Value(), curationValueDto) actualKey := new(curationKey)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
actualKey := &curationKey{}
actualKey.load(curationKeyDto) actualKey.load(curationKeyDto)
actualCurationRemark := &curationRemark{}
actualCurationRemark.load(curationValueDto) actualValue, present, err := curatorStates.Get(actualKey)
signature := expected.processor.Signature() if !present {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, actualKey)
}
if err != nil {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, err)
}
expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint := &clientmodel.Fingerprint{}
expectedFingerprint.LoadFromString(expected.fingerprint) expectedFingerprint.LoadFromString(expected.fingerprint)
expectedKey := &curationKey{ expectedKey := &curationKey{
Fingerprint: expectedFingerprint, Fingerprint: expectedFingerprint,
IgnoreYoungerThan: expected.ignoreYoungerThan, IgnoreYoungerThan: expected.ignoreYoungerThan,
ProcessorMessageRaw: signature, ProcessorMessageRaw: expected.processor.Signature(),
ProcessorMessageTypeName: expected.processor.Name(), ProcessorMessageTypeName: expected.processor.Name(),
} }
if !actualKey.Equal(expectedKey) { if !actualKey.Equal(expectedKey) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey) t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
} }
expectedCurationRemark := curationRemark{ if !actualValue.Equal(expected.lastCurated) {
LastCompletionTimestamp: expected.lastCurated, t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.lastCurated, actualValue)
}
if !actualCurationRemark.Equal(expectedCurationRemark) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
} }
} }
@ -1374,8 +1370,11 @@ func TestCuratorDeletionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close() defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{
Path: curatorDirectory.Path()}) LevelDBOptions: leveldb.LevelDBOptions{
Path: curatorDirectory.Path(),
},
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1412,7 +1411,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
iterator := curatorStates.NewIterator(true) iterator := curatorStates.p.NewIterator(true)
defer iterator.Close() defer iterator.Close()
for j, expected := range scenario.out.curationStates { for j, expected := range scenario.out.curationStates {
@ -1427,24 +1426,25 @@ func TestCuratorDeletionProcessor(t *testing.T) {
} }
} }
curationKeyDto := &dto.CurationKey{} curationKeyDto := new(dto.CurationKey)
curationValueDto := &dto.CurationValue{}
err = proto.Unmarshal(iterator.Key(), curationKeyDto) err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil { if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
} }
err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
actualKey := &curationKey{} actualKey := new(curationKey)
actualKey.load(curationKeyDto) actualKey.load(curationKeyDto)
actualCurationRemark := &curationRemark{}
actualCurationRemark.load(curationValueDto)
signature := expected.processor.Signature() signature := expected.processor.Signature()
actualValue, present, err := curatorStates.Get(actualKey)
if !present {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, actualKey)
}
if err != nil {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, err)
}
expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint := &clientmodel.Fingerprint{}
expectedFingerprint.LoadFromString(expected.fingerprint) expectedFingerprint.LoadFromString(expected.fingerprint)
expectedKey := &curationKey{ expectedKey := &curationKey{
@ -1456,11 +1456,8 @@ func TestCuratorDeletionProcessor(t *testing.T) {
if !actualKey.Equal(expectedKey) { if !actualKey.Equal(expectedKey) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey) t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
} }
expectedCurationRemark := curationRemark{ if !actualValue.Equal(expected.lastCurated) {
LastCompletionTimestamp: expected.lastCurated, t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.lastCurated, actualValue)
}
if !actualCurationRemark.Equal(expectedCurationRemark) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
} }
} }

View file

@ -15,7 +15,6 @@ package metric
import ( import (
"container/list" "container/list"
"io"
"sync" "sync"
"time" "time"
@ -171,7 +170,6 @@ func (lru *WatermarkCache) checkCapacity() {
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time
type HighWatermarker interface { type HighWatermarker interface {
io.Closer
raw.ForEacher raw.ForEacher
raw.Pruner raw.Pruner
@ -181,11 +179,11 @@ type HighWatermarker interface {
Size() (uint64, bool, error) Size() (uint64, bool, error)
} }
type LeveldbHighWatermarker struct { type LevelDBHighWatermarker struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) { func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) {
k := new(dto.Fingerprint) k := new(dto.Fingerprint)
dumpFingerprint(k, f) dumpFingerprint(k, f)
v := new(dto.MetricHighWatermark) v := new(dto.MetricHighWatermark)
@ -200,7 +198,7 @@ func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o
return t, true, nil return t, true, nil
} }
func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error { func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -229,28 +227,26 @@ func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
return w.p.Commit(batch) return w.p.Commit(batch)
} }
func (i *LeveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { func (w *LevelDBHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o) return w.p.ForEach(d, f, o)
} }
func (i *LeveldbHighWatermarker) Prune() (bool, error) { func (w *LevelDBHighWatermarker) Prune() (bool, error) {
i.p.Prune() w.p.Prune()
return false, nil return false, nil
} }
func (i *LeveldbHighWatermarker) Close() error { func (w *LevelDBHighWatermarker) Close() {
i.p.Close() w.p.Close()
return nil
} }
func (i *LeveldbHighWatermarker) State() *raw.DatabaseState { func (w *LevelDBHighWatermarker) State() *raw.DatabaseState {
return i.p.State() return w.p.State()
} }
func (i *LeveldbHighWatermarker) Size() (uint64, bool, error) { func (w *LevelDBHighWatermarker) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := w.p.Size()
return s, true, err return s, true, err
} }
@ -258,13 +254,82 @@ type LevelDBHighWatermarkerOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (HighWatermarker, error) { func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &LeveldbHighWatermarker{ return &LevelDBHighWatermarker{
p: s,
}, nil
}
type CurationRemarker interface {
raw.Pruner
Update(*curationKey, time.Time) error
Get(*curationKey) (t time.Time, ok bool, err error)
State() *raw.DatabaseState
Size() (uint64, bool, error)
}
type LevelDBCurationRemarker struct {
p *leveldb.LevelDBPersistence
}
type LevelDBCurationRemarkerOptions struct {
leveldb.LevelDBOptions
}
func (w *LevelDBCurationRemarker) State() *raw.DatabaseState {
return w.p.State()
}
func (w *LevelDBCurationRemarker) Size() (uint64, bool, error) {
s, err := w.p.Size()
return s, true, err
}
func (w *LevelDBCurationRemarker) Close() {
w.p.Close()
}
func (w *LevelDBCurationRemarker) Prune() (bool, error) {
w.p.Prune()
return false, nil
}
func (w *LevelDBCurationRemarker) Get(c *curationKey) (t time.Time, ok bool, err error) {
k := new(dto.CurationKey)
c.dump(k)
v := new(dto.CurationValue)
ok, err = w.p.Get(k, v)
if err != nil || !ok {
return t, ok, err
}
return time.Unix(v.GetLastCompletionTimestamp(), 0).UTC(), true, nil
}
func (w *LevelDBCurationRemarker) Update(pair *curationKey, t time.Time) error {
k := new(dto.CurationKey)
pair.dump(k)
return w.p.Put(k, &dto.CurationValue{
LastCompletionTimestamp: proto.Int64(t.Unix()),
})
}
func NewLevelDBCurationRemarker(o *LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LevelDBCurationRemarker{
p: s, p: s,
}, nil }, nil
} }

View file

@ -71,8 +71,8 @@ func (l *LevelDBMembershipIndex) Prune() {
l.persistence.Prune() l.persistence.Prune()
} }
func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) { func (l *LevelDBMembershipIndex) Size() (uint64, error) {
return l.persistence.ApproximateSize() return l.persistence.Size()
} }
func (l *LevelDBMembershipIndex) State() *raw.DatabaseState { func (l *LevelDBMembershipIndex) State() *raw.DatabaseState {

View file

@ -332,7 +332,7 @@ func (l *LevelDBPersistence) Prune() {
l.storage.CompactRange(keyspace) l.storage.CompactRange(keyspace)
} }
func (l *LevelDBPersistence) ApproximateSize() (uint64, error) { func (l *LevelDBPersistence) Size() (uint64, error) {
iterator := l.NewIterator(false) iterator := l.NewIterator(false)
defer iterator.Close() defer iterator.Close()

View file

@ -31,7 +31,7 @@ func (l *LevelDBPersistence) State() *raw.DatabaseState {
Supplemental: map[string]string{}, Supplemental: map[string]string{},
} }
if size, err := l.ApproximateSize(); err != nil { if size, err := l.Size(); err != nil {
databaseState.Supplemental["Errors"] = err.Error() databaseState.Supplemental["Errors"] = err.Error()
} else { } else {
databaseState.Size = utility.ByteSize(size) databaseState.Size = utility.ByteSize(size)

View file

@ -42,10 +42,10 @@ func main() {
start := time.Now() start := time.Now()
log.Printf("Starting compaction...") log.Printf("Starting compaction...")
size, _ := persistences.ApproximateSizes() size, _ := persistences.Sizes()
log.Printf("Original Size: %d", size) log.Printf("Original Size: %d", size)
persistences.Prune() persistences.Prune()
log.Printf("Finished in %s", time.Since(start)) log.Printf("Finished in %s", time.Since(start))
size, _ = persistences.ApproximateSizes() size, _ = persistences.Sizes()
log.Printf("New Size: %d", size) log.Printf("New Size: %d", size)
} }