Merge pull request #291 from prometheus/refactor/storage/leveldb-simplifications

Persist solely Protocol Buffers.
This commit is contained in:
Matt T. Proud 2013-06-09 01:09:29 -07:00
commit 1c2ccef9a2
13 changed files with 167 additions and 211 deletions

View file

@ -206,10 +206,10 @@ func (w watermarkFilter) shouldStop() bool {
return len(w.stop) != 0 return len(w.stop) != 0
} }
func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (remark *model.CurationRemark, err error) { func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (*model.CurationRemark, error) {
rawSignature, err := processor.Signature() rawSignature, err := processor.Signature()
if err != nil { if err != nil {
return return nil, err
} }
curationKey := model.CurationKey{ curationKey := model.CurationKey{
@ -220,30 +220,17 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge
}.ToDTO() }.ToDTO()
curationValue := &dto.CurationValue{} curationValue := &dto.CurationValue{}
rawKey := coding.NewPBEncoder(curationKey) present, err := states.Get(curationKey, curationValue)
has, err := states.Has(rawKey)
if err != nil { if err != nil {
return return nil, err
} }
if !has { if !present {
return return nil, nil
} }
rawCurationValue, err := states.Get(rawKey) remark := model.NewCurationRemarkFromDTO(curationValue)
if err != nil {
return
}
err = proto.Unmarshal(rawCurationValue, curationValue) return &remark, nil
if err != nil {
return
}
baseRemark := model.NewCurationRemarkFromDTO(curationValue)
remark = &baseRemark
return
} }
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) { func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) {
@ -386,7 +373,7 @@ func (w watermarkOperator) refreshCurationRemark(f *model.Fingerprint, finished
LastCompletionTimestamp: finished, LastCompletionTimestamp: finished,
}.ToDTO() }.ToDTO()
err = w.curationState.Put(coding.NewPBEncoder(curationKey), coding.NewPBEncoder(curationValue)) err = w.curationState.Put(curationKey, curationValue)
return return
} }

View file

@ -14,25 +14,25 @@
package metric package metric
import ( import (
"code.google.com/p/goprotobuf/proto"
"flag" "flag"
"fmt" "fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
"log" "log"
"sort" "sort"
"sync" "sync"
"time" "time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
) )
const ( const sortConcurrency = 2
sortConcurrency = 2
)
type LevelDBMetricPersistence struct { type LevelDBMetricPersistence struct {
CurationRemarks *leveldb.LevelDBPersistence CurationRemarks *leveldb.LevelDBPersistence
@ -302,7 +302,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO()) value.Member = append(value.Member, fingerprint.ToDTO())
} }
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) batch.Put(key, value)
} }
err = l.labelNameToFingerprints.Commit(batch) err = l.labelNameToFingerprints.Commit(batch)
@ -375,7 +375,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO()) value.Member = append(value.Member, fingerprint.ToDTO())
} }
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) batch.Put(key, value)
} }
err = l.labelSetToFingerprints.Commit(batch) err = l.labelSetToFingerprints.Commit(batch)
@ -401,9 +401,7 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
defer batch.Close() defer batch.Close()
for fingerprint, metric := range metrics { for fingerprint, metric := range metrics {
key := coding.NewPBEncoder(fingerprint.ToDTO()) batch.Put(fingerprint.ToDTO(), model.MetricToDTO(metric))
value := coding.NewPBEncoder(model.MetricToDTO(metric))
batch.Put(key, value)
} }
err = l.fingerprintToMetrics.Commit(batch) err = l.fingerprintToMetrics.Commit(batch)
@ -414,6 +412,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
return return
} }
var existenceIdentity = &dto.MembershipIndexValue{}
// indexMetrics takes groups of samples, determines which ones contain metrics // indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all // that are unknown to the storage stack, and then proceeds to update all
// affected indices. // affected indices.
@ -465,10 +465,8 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
// WART: We should probably encode simple fingerprints.
for _, metric := range absentMetrics { for _, metric := range absentMetrics {
key := coding.NewPBEncoder(model.MetricToDTO(metric)) batch.Put(model.MetricToDTO(metric), existenceIdentity)
batch.Put(key, key)
} }
err = l.metricMembershipIndex.Commit(batch) err = l.metricMembershipIndex.Commit(batch)
@ -492,27 +490,23 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
mutationCount := 0 mutationCount := 0
for fingerprint, samples := range groups { for fingerprint, samples := range groups {
keyEncoded := coding.NewPBEncoder(fingerprint.ToDTO())
value := &dto.MetricHighWatermark{} value := &dto.MetricHighWatermark{}
newestSampleTimestamp := samples[len(samples)-1].Timestamp newestSampleTimestamp := samples[len(samples)-1].Timestamp
present, err := l.MetricHighWatermarks.Get(fingerprint.ToDTO(), value)
raw, err := l.MetricHighWatermarks.Get(keyEncoded)
if err != nil { if err != nil {
return err return err
} }
if !present {
if raw != nil { continue
err = proto.Unmarshal(raw, value)
if err != nil {
return err
}
if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
continue
}
} }
// BUG(matt): Repace this with watermark management.
if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
continue
}
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(keyEncoded, coding.NewPBEncoder(value)) batch.Put(fingerprint.ToDTO(), value)
mutationCount++ mutationCount++
} }
@ -583,7 +577,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
}) })
} }
samplesBatch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) samplesBatch.Put(key, value)
} }
} }
@ -656,8 +650,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}(time.Now()) }(time.Now())
dtoKey := coding.NewPBEncoder(dto) value, err = l.metricMembershipIndex.Has(dto)
value, err = l.metricMembershipIndex.Has(dtoKey)
return return
} }
@ -669,8 +662,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now()) }(time.Now())
dtoKey := coding.NewPBEncoder(dto) value, err = l.labelSetToFingerprints.Has(dto)
value, err = l.labelSetToFingerprints.Has(dtoKey)
return return
} }
@ -682,8 +674,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}(time.Now()) }(time.Now())
dtoKey := coding.NewPBEncoder(dto) value, err = l.labelNameToFingerprints.Has(dto)
value, err = l.labelNameToFingerprints.Has(dtoKey)
return return
} }
@ -698,15 +689,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
sets := []utility.Set{} sets := []utility.Set{}
for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) {
f, err := l.labelSetToFingerprints.Get(coding.NewPBEncoder(labelSetDTO)) unmarshaled := &dto.FingerprintCollection{}
present, err := l.labelSetToFingerprints.Get(labelSetDTO, unmarshaled)
if err != nil { if err != nil {
return fps, err return fps, err
} }
if !present {
unmarshaled := &dto.FingerprintCollection{} return nil, nil
err = proto.Unmarshal(f, unmarshaled)
if err != nil {
return fps, err
} }
set := utility.Set{} set := utility.Set{}
@ -743,16 +732,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}(time.Now()) }(time.Now())
raw, err := l.labelNameToFingerprints.Get(coding.NewPBEncoder(model.LabelNameToDTO(&labelName)))
if err != nil {
return
}
unmarshaled := &dto.FingerprintCollection{} unmarshaled := &dto.FingerprintCollection{}
present, err := l.labelNameToFingerprints.Get(model.LabelNameToDTO(&labelName), unmarshaled)
err = proto.Unmarshal(raw, unmarshaled)
if err != nil { if err != nil {
return return nil, err
}
if !present {
return nil, nil
} }
for _, m := range unmarshaled.Member { for _, m := range unmarshaled.Member {
@ -760,7 +746,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
fps = append(fps, fp) fps = append(fps, fp)
} }
return return fps, nil
} }
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) { func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) {
@ -770,15 +756,13 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint)
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}(time.Now()) }(time.Now())
raw, err := l.fingerprintToMetrics.Get(coding.NewPBEncoder(model.FingerprintToDTO(f)))
if err != nil {
return
}
unmarshaled := &dto.Metric{} unmarshaled := &dto.Metric{}
err = proto.Unmarshal(raw, unmarshaled) present, err := l.fingerprintToMetrics.Get(model.FingerprintToDTO(f), unmarshaled)
if err != nil { if err != nil {
return return nil, err
}
if !present {
return nil, nil
} }
m = model.Metric{} m = model.Metric{}
@ -787,7 +771,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint)
m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value) m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value)
} }
return return m, nil
} }
func (l LevelDBMetricPersistence) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values { func (l LevelDBMetricPersistence) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values {

View file

@ -14,14 +14,16 @@
package metric package metric
import ( import (
"code.google.com/p/goprotobuf/proto"
"fmt" "fmt"
"github.com/prometheus/prometheus/coding" "time"
"github.com/prometheus/prometheus/model"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"time"
) )
// processor models a post-processing agent that performs work given a sample // processor models a post-processing agent that performs work given a sample
@ -153,8 +155,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize: case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize:
if !keyDropped { if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true keyDropped = true
} }
pendingSamples = append(pendingSamples, unactedSamples...) pendingSamples = append(pendingSamples, unactedSamples...)
@ -165,15 +166,12 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
// If the number of pending writes equals the target group size // If the number of pending writes equals the target group size
case len(pendingSamples) == p.MinimumGroupSize: case len(pendingSamples) == p.MinimumGroupSize:
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewPBEncoder(newSampleKey.ToDTO()) pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO())
value := coding.NewPBEncoder(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingMutations++ pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
if len(unactedSamples) > 0 { if len(unactedSamples) > 0 {
if !keyDropped { if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true keyDropped = true
} }
@ -190,8 +188,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize: case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize:
if !keyDropped { if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true keyDropped = true
} }
remainder := p.MinimumGroupSize - len(pendingSamples) remainder := p.MinimumGroupSize - len(pendingSamples)
@ -211,9 +208,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
if len(unactedSamples) > 0 || len(pendingSamples) > 0 { if len(unactedSamples) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(pendingSamples, unactedSamples...) pendingSamples = append(pendingSamples, unactedSamples...)
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewPBEncoder(newSampleKey.ToDTO()) pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO())
value := coding.NewPBEncoder(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingSamples = model.Values{} pendingSamples = model.Values{}
pendingMutations++ pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
@ -320,24 +315,20 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist
pendingBatch = nil pendingBatch = nil
case !sampleKey.MayContain(stopAt): case !sampleKey.MayContain(stopAt):
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
lastCurated = sampleKey.LastTimestamp lastCurated = sampleKey.LastTimestamp
sampleValues = model.Values{} sampleValues = model.Values{}
pendingMutations++ pendingMutations++
case sampleKey.MayContain(stopAt): case sampleKey.MayContain(stopAt):
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
pendingMutations++ pendingMutations++
sampleValues = sampleValues.TruncateBefore(stopAt) sampleValues = sampleValues.TruncateBefore(stopAt)
if len(sampleValues) > 0 { if len(sampleValues) > 0 {
sampleKey = sampleValues.ToSampleKey(fingerprint) sampleKey = sampleValues.ToSampleKey(fingerprint)
lastCurated = sampleKey.FirstTimestamp lastCurated = sampleKey.FirstTimestamp
newKey := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Put(sampleKey.ToDTO(), sampleValues.ToDTO())
newValue := coding.NewPBEncoder(sampleValues.ToDTO())
pendingBatch.Put(newKey, newValue)
pendingMutations++ pendingMutations++
} else { } else {
lastCurated = sampleKey.LastTimestamp lastCurated = sampleKey.LastTimestamp

View file

@ -14,15 +14,17 @@
package metric package metric
import ( import (
"code.google.com/p/goprotobuf/proto"
"fmt" "fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"testing" "testing"
"time" "time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/raw/leveldb"
) )
type curationState struct { type curationState struct {
@ -56,40 +58,40 @@ type out struct {
sampleGroups []sampleGroup sampleGroups []sampleGroup
} }
func (c curationState) Get() (key, value coding.Encoder) { func (c curationState) Get() (key, value proto.Message) {
signature, err := c.processor.Signature() signature, err := c.processor.Signature()
if err != nil { if err != nil {
panic(err) panic(err)
} }
key = coding.NewPBEncoder(model.CurationKey{ key = model.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint), Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint),
ProcessorMessageRaw: signature, ProcessorMessageRaw: signature,
ProcessorMessageTypeName: c.processor.Name(), ProcessorMessageTypeName: c.processor.Name(),
IgnoreYoungerThan: c.ignoreYoungerThan, IgnoreYoungerThan: c.ignoreYoungerThan,
}.ToDTO()) }.ToDTO()
value = coding.NewPBEncoder(model.CurationRemark{ value = model.CurationRemark{
LastCompletionTimestamp: c.lastCurated, LastCompletionTimestamp: c.lastCurated,
}.ToDTO()) }.ToDTO()
return return
} }
func (w watermarkState) Get() (key, value coding.Encoder) { func (w watermarkState) Get() (key, value proto.Message) {
key = coding.NewPBEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) key = model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()
value = coding.NewPBEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) value = model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()
return return
} }
func (s sampleGroup) Get() (key, value coding.Encoder) { func (s sampleGroup) Get() (key, value proto.Message) {
key = coding.NewPBEncoder(model.SampleKey{ key = model.SampleKey{
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint), Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint),
FirstTimestamp: s.values[0].Timestamp, FirstTimestamp: s.values[0].Timestamp,
LastTimestamp: s.values[len(s.values)-1].Timestamp, LastTimestamp: s.values[len(s.values)-1].Timestamp,
SampleCount: uint32(len(s.values)), SampleCount: uint32(len(s.values)),
}.ToDTO()) }.ToDTO()
value = coding.NewPBEncoder(s.values.ToDTO()) value = s.values.ToDTO()
return return
} }

View file

@ -19,8 +19,6 @@ import (
"sort" "sort"
"time" "time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
@ -341,18 +339,12 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e
wm, ok := t.wmCache.Get(f) wm, ok := t.wmCache.Get(f)
if !ok { if !ok {
rowKey := coding.NewPBEncoder(f.ToDTO()) value := &dto.MetricHighWatermark{}
raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey) present, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value)
if err != nil { if err != nil {
return false, err return false, err
} }
if raw != nil { if present {
value := &dto.MetricHighWatermark{}
err = proto.Unmarshal(raw, value)
if err != nil {
return false, err
}
wmTime := time.Unix(*value.Timestamp, 0).UTC() wmTime := time.Unix(*value.Timestamp, 0).UTC()
t.wmCache.Set(f, &Watermarks{High: wmTime}) t.wmCache.Set(f, &Watermarks{High: wmTime})
return wmTime.Before(i), nil return wmTime.Before(i), nil

View file

@ -13,13 +13,11 @@
package index package index
import ( import "code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding"
)
type MembershipIndex interface { type MembershipIndex interface {
Has(key coding.Encoder) (bool, error) Has(key proto.Message) (bool, error)
Put(key coding.Encoder) error Put(key proto.Message) error
Drop(key coding.Encoder) error Drop(key proto.Message) error
Close() Close()
} }

View file

@ -14,20 +14,15 @@
package leveldb package leveldb
import ( import (
"github.com/prometheus/prometheus/coding" "code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
) )
type indexValue struct{} var existenceValue = &dto.MembershipIndexValue{}
func (i *indexValue) MustEncode() []byte {
return []byte{}
}
var (
existenceValue = &indexValue{}
)
type LevelDBMembershipIndex struct { type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence persistence *leveldb.LevelDBPersistence
@ -37,16 +32,16 @@ func (l *LevelDBMembershipIndex) Close() {
l.persistence.Close() l.persistence.Close()
} }
func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) { func (l *LevelDBMembershipIndex) Has(k proto.Message) (bool, error) {
return l.persistence.Has(key) return l.persistence.Has(k)
} }
func (l *LevelDBMembershipIndex) Drop(key coding.Encoder) error { func (l *LevelDBMembershipIndex) Drop(k proto.Message) error {
return l.persistence.Drop(key) return l.persistence.Drop(k)
} }
func (l *LevelDBMembershipIndex) Put(key coding.Encoder) error { func (l *LevelDBMembershipIndex) Put(k proto.Message) error {
return l.persistence.Put(key, existenceValue) return l.persistence.Put(k, existenceValue)
} }
func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) { func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) {

View file

@ -14,7 +14,8 @@
package raw package raw
import ( import (
"github.com/prometheus/prometheus/coding" "code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
@ -25,14 +26,14 @@ type Persistence interface {
// persistence. // persistence.
Close() Close()
// Has informs the user whether a given key exists in the database. // Has informs the user whether a given key exists in the database.
Has(key coding.Encoder) (bool, error) Has(key proto.Message) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if // Get retrieves the key from the database if it exists or returns nil if
// it is absent. // it is absent.
Get(key coding.Encoder) ([]byte, error) Get(key, value proto.Message) (present bool, err error)
// Drop removes the key from the database. // Drop removes the key from the database.
Drop(key coding.Encoder) error Drop(key proto.Message) error
// Put sets the key to a given value. // Put sets the key to a given value.
Put(key, value coding.Encoder) error Put(key, value proto.Message) error
// ForEach is responsible for iterating through all records in the database // ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met: // until one of the following conditions are met:
// //
@ -41,7 +42,7 @@ type Persistence interface {
// 3.) A FilterResult of STOP is emitted by the Filter. // 3.) A FilterResult of STOP is emitted by the Filter.
// //
// Decoding errors for an entity cause that entity to be skipped. // Decoding errors for an entity cause that entity to be skipped.
ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
// Commit applies the Batch operations to the database. // Commit applies the Batch operations to the database.
Commit(Batch) error Commit(Batch) error
} }
@ -54,7 +55,7 @@ type Batch interface {
// batch mutation. // batch mutation.
Close() Close()
// Put follows the same protocol as Persistence.Put. // Put follows the same protocol as Persistence.Put.
Put(key, value coding.Encoder) Put(key, value proto.Message)
// Drop follows the same protocol as Persistence.Drop. // Drop follows the same protocol as Persistence.Drop.
Drop(key coding.Encoder) Drop(key proto.Message)
} }

View file

@ -15,7 +15,10 @@ package leveldb
import ( import (
"fmt" "fmt"
"code.google.com/p/goprotobuf/proto"
"github.com/jmhodges/levigo" "github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
) )
@ -31,25 +34,23 @@ func NewBatch() *batch {
} }
} }
func (b *batch) Drop(key coding.Encoder) { func (b *batch) Drop(key proto.Message) {
keyEncoded := key.MustEncode() b.batch.Delete(coding.NewPBEncoder(key).MustEncode())
b.drops++
b.batch.Delete(keyEncoded) b.drops++
} }
func (b *batch) Put(key, value coding.Encoder) { func (b *batch) Put(key, value proto.Message) {
keyEncoded := key.MustEncode() b.batch.Put(coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
valueEncoded := value.MustEncode()
b.puts++ b.puts++
b.batch.Put(keyEncoded, valueEncoded)
} }
func (b batch) Close() { func (b *batch) Close() {
b.batch.Close() b.batch.Close()
} }
func (b batch) String() string { func (b *batch) String() string {
return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops) return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops)
} }

View file

@ -16,11 +16,14 @@ package leveldb
import ( import (
"flag" "flag"
"fmt" "fmt"
"time"
"code.google.com/p/goprotobuf/proto"
"github.com/jmhodges/levigo" "github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"time"
) )
var ( var (
@ -250,38 +253,37 @@ func (l *LevelDBPersistence) Close() {
return return
} }
func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) { func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
key := value.MustEncode() raw, err := l.storage.Get(l.readOptions, coding.NewPBEncoder(k).MustEncode())
return l.storage.Get(l.readOptions, key)
}
func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) {
raw, err := l.Get(value)
if err != nil { if err != nil {
return return false, err
}
if raw == nil {
return false, nil
} }
h = raw != nil if v == nil {
return true, nil
}
return err = proto.Unmarshal(raw, v)
if err != nil {
return true, err
}
return true, nil
} }
func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) { func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) {
key := value.MustEncode() return l.Get(k, nil)
err = l.storage.Delete(l.writeOptions, key)
return
} }
func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) { func (l *LevelDBPersistence) Drop(k proto.Message) error {
keyEncoded := key.MustEncode() return l.storage.Delete(l.writeOptions, coding.NewPBEncoder(k).MustEncode())
}
valueEncoded := value.MustEncode() func (l *LevelDBPersistence) Put(key, value proto.Message) error {
return l.storage.Put(l.writeOptions, coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded)
return
} }
func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {

View file

@ -14,7 +14,8 @@
package test package test
import ( import (
"github.com/prometheus/prometheus/coding" "code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
) )
@ -28,7 +29,7 @@ type (
// Pair models a prospective (key, value) double that will be committed to // Pair models a prospective (key, value) double that will be committed to
// a database. // a database.
Pair interface { Pair interface {
Get() (key, value coding.Encoder) Get() (key, value proto.Message)
} }
// Pairs models a list of Pair for disk committing. // Pairs models a list of Pair for disk committing.
@ -47,7 +48,7 @@ type (
// data to build. // data to build.
HasNext() (has bool) HasNext() (has bool)
// Next emits the next (key, value) double for storage. // Next emits the next (key, value) double for storage.
Next() (key coding.Encoder, value coding.Encoder) Next() (key, value proto.Message)
} }
preparer struct { preparer struct {
@ -88,7 +89,7 @@ func (f cassetteFactory) HasNext() bool {
return f.index < f.count return f.index < f.count
} }
func (f *cassetteFactory) Next() (key, value coding.Encoder) { func (f *cassetteFactory) Next() (key, value proto.Message) {
key, value = f.pairs[f.index].Get() key, value = f.pairs[f.index].Get()
f.index++ f.index++

View file

@ -19,17 +19,20 @@
package main package main
import ( import (
"code.google.com/p/goprotobuf/proto"
"encoding/csv" "encoding/csv"
"flag" "flag"
"fmt" "fmt"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
"log" "log"
"os" "os"
"strconv" "strconv"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
) )
var ( var (

View file

@ -102,7 +102,6 @@ func getEmbeddedTemplate(name string) (*template.Template, error) {
return t, nil return t, nil
} }
func getTemplate(name string) (t *template.Template, err error) { func getTemplate(name string) (t *template.Template, err error) {
if *useLocalAssets { if *useLocalAssets {
t, err = getLocalTemplate(name) t, err = getLocalTemplate(name)