Persist solely Protocol Buffers.

An design question was open for me in the beginning was whether to
serialize other types to disk, but Protocol Buffers quickly won out,
which allows us to drop support for other types.  This is a good
start to cleaning up a lot of cruft in the storage stack and
can let us eventually decouple the various moving parts into
separate subsystems for easier reasoning.

This commit is not strictly required, but it is a start to making
the rest a lot more enjoyable to interact with.
This commit is contained in:
Matt T. Proud 2013-06-08 10:27:44 +02:00
parent 25bd356aac
commit a73f061d3c
13 changed files with 167 additions and 211 deletions

View file

@ -206,10 +206,10 @@ func (w watermarkFilter) shouldStop() bool {
return len(w.stop) != 0 return len(w.stop) != 0
} }
func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (remark *model.CurationRemark, err error) { func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint *model.Fingerprint) (*model.CurationRemark, error) {
rawSignature, err := processor.Signature() rawSignature, err := processor.Signature()
if err != nil { if err != nil {
return return nil, err
} }
curationKey := model.CurationKey{ curationKey := model.CurationKey{
@ -220,30 +220,17 @@ func getCurationRemark(states raw.Persistence, processor Processor, ignoreYounge
}.ToDTO() }.ToDTO()
curationValue := &dto.CurationValue{} curationValue := &dto.CurationValue{}
rawKey := coding.NewPBEncoder(curationKey) present, err := states.Get(curationKey, curationValue)
has, err := states.Has(rawKey)
if err != nil { if err != nil {
return return nil, err
} }
if !has { if !present {
return return nil, nil
} }
rawCurationValue, err := states.Get(rawKey) remark := model.NewCurationRemarkFromDTO(curationValue)
if err != nil {
return
}
err = proto.Unmarshal(rawCurationValue, curationValue) return &remark, nil
if err != nil {
return
}
baseRemark := model.NewCurationRemarkFromDTO(curationValue)
remark = &baseRemark
return
} }
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) { func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) {
@ -386,7 +373,7 @@ func (w watermarkOperator) refreshCurationRemark(f *model.Fingerprint, finished
LastCompletionTimestamp: finished, LastCompletionTimestamp: finished,
}.ToDTO() }.ToDTO()
err = w.curationState.Put(coding.NewPBEncoder(curationKey), coding.NewPBEncoder(curationValue)) err = w.curationState.Put(curationKey, curationValue)
return return
} }

View file

@ -14,25 +14,25 @@
package metric package metric
import ( import (
"code.google.com/p/goprotobuf/proto"
"flag" "flag"
"fmt" "fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
"log" "log"
"sort" "sort"
"sync" "sync"
"time" "time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
) )
const ( const sortConcurrency = 2
sortConcurrency = 2
)
type LevelDBMetricPersistence struct { type LevelDBMetricPersistence struct {
CurationRemarks *leveldb.LevelDBPersistence CurationRemarks *leveldb.LevelDBPersistence
@ -302,7 +302,7 @@ func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO()) value.Member = append(value.Member, fingerprint.ToDTO())
} }
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) batch.Put(key, value)
} }
err = l.labelNameToFingerprints.Commit(batch) err = l.labelNameToFingerprints.Commit(batch)
@ -375,7 +375,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[model.Fingerprint
value.Member = append(value.Member, fingerprint.ToDTO()) value.Member = append(value.Member, fingerprint.ToDTO())
} }
batch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) batch.Put(key, value)
} }
err = l.labelSetToFingerprints.Commit(batch) err = l.labelSetToFingerprints.Commit(batch)
@ -401,9 +401,7 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
defer batch.Close() defer batch.Close()
for fingerprint, metric := range metrics { for fingerprint, metric := range metrics {
key := coding.NewPBEncoder(fingerprint.ToDTO()) batch.Put(fingerprint.ToDTO(), model.MetricToDTO(metric))
value := coding.NewPBEncoder(model.MetricToDTO(metric))
batch.Put(key, value)
} }
err = l.fingerprintToMetrics.Commit(batch) err = l.fingerprintToMetrics.Commit(batch)
@ -414,6 +412,8 @@ func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[model.Fingerpri
return return
} }
var existenceIdentity = &dto.MembershipIndexValue{}
// indexMetrics takes groups of samples, determines which ones contain metrics // indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all // that are unknown to the storage stack, and then proceeds to update all
// affected indices. // affected indices.
@ -465,10 +465,8 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[model.Fingerpri
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
// WART: We should probably encode simple fingerprints.
for _, metric := range absentMetrics { for _, metric := range absentMetrics {
key := coding.NewPBEncoder(model.MetricToDTO(metric)) batch.Put(model.MetricToDTO(metric), existenceIdentity)
batch.Put(key, key)
} }
err = l.metricMembershipIndex.Commit(batch) err = l.metricMembershipIndex.Commit(batch)
@ -492,27 +490,23 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
mutationCount := 0 mutationCount := 0
for fingerprint, samples := range groups { for fingerprint, samples := range groups {
keyEncoded := coding.NewPBEncoder(fingerprint.ToDTO())
value := &dto.MetricHighWatermark{} value := &dto.MetricHighWatermark{}
newestSampleTimestamp := samples[len(samples)-1].Timestamp newestSampleTimestamp := samples[len(samples)-1].Timestamp
present, err := l.MetricHighWatermarks.Get(fingerprint.ToDTO(), value)
raw, err := l.MetricHighWatermarks.Get(keyEncoded)
if err != nil { if err != nil {
return err return err
} }
if !present {
if raw != nil { continue
err = proto.Unmarshal(raw, value)
if err != nil {
return err
} }
// BUG(matt): Repace this with watermark management.
if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
continue continue
} }
}
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(keyEncoded, coding.NewPBEncoder(value)) batch.Put(fingerprint.ToDTO(), value)
mutationCount++ mutationCount++
} }
@ -583,7 +577,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
}) })
} }
samplesBatch.Put(coding.NewPBEncoder(key), coding.NewPBEncoder(value)) samplesBatch.Put(key, value)
} }
} }
@ -656,8 +650,7 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}(time.Now()) }(time.Now())
dtoKey := coding.NewPBEncoder(dto) value, err = l.metricMembershipIndex.Has(dto)
value, err = l.metricMembershipIndex.Has(dtoKey)
return return
} }
@ -669,8 +662,7 @@ func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now()) }(time.Now())
dtoKey := coding.NewPBEncoder(dto) value, err = l.labelSetToFingerprints.Has(dto)
value, err = l.labelSetToFingerprints.Has(dtoKey)
return return
} }
@ -682,8 +674,7 @@ func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool,
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}(time.Now()) }(time.Now())
dtoKey := coding.NewPBEncoder(dto) value, err = l.labelNameToFingerprints.Has(dto)
value, err = l.labelNameToFingerprints.Has(dtoKey)
return return
} }
@ -698,15 +689,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet model.Lab
sets := []utility.Set{} sets := []utility.Set{}
for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) { for _, labelSetDTO := range model.LabelSetToDTOs(&labelSet) {
f, err := l.labelSetToFingerprints.Get(coding.NewPBEncoder(labelSetDTO)) unmarshaled := &dto.FingerprintCollection{}
present, err := l.labelSetToFingerprints.Get(labelSetDTO, unmarshaled)
if err != nil { if err != nil {
return fps, err return fps, err
} }
if !present {
unmarshaled := &dto.FingerprintCollection{} return nil, nil
err = proto.Unmarshal(f, unmarshaled)
if err != nil {
return fps, err
} }
set := utility.Set{} set := utility.Set{}
@ -743,16 +732,13 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}(time.Now()) }(time.Now())
raw, err := l.labelNameToFingerprints.Get(coding.NewPBEncoder(model.LabelNameToDTO(&labelName)))
if err != nil {
return
}
unmarshaled := &dto.FingerprintCollection{} unmarshaled := &dto.FingerprintCollection{}
present, err := l.labelNameToFingerprints.Get(model.LabelNameToDTO(&labelName), unmarshaled)
err = proto.Unmarshal(raw, unmarshaled)
if err != nil { if err != nil {
return return nil, err
}
if !present {
return nil, nil
} }
for _, m := range unmarshaled.Member { for _, m := range unmarshaled.Member {
@ -760,7 +746,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName model.L
fps = append(fps, fp) fps = append(fps, fp)
} }
return return fps, nil
} }
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) { func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint) (m model.Metric, err error) {
@ -770,15 +756,13 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint)
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}(time.Now()) }(time.Now())
raw, err := l.fingerprintToMetrics.Get(coding.NewPBEncoder(model.FingerprintToDTO(f)))
if err != nil {
return
}
unmarshaled := &dto.Metric{} unmarshaled := &dto.Metric{}
err = proto.Unmarshal(raw, unmarshaled) present, err := l.fingerprintToMetrics.Get(model.FingerprintToDTO(f), unmarshaled)
if err != nil { if err != nil {
return return nil, err
}
if !present {
return nil, nil
} }
m = model.Metric{} m = model.Metric{}
@ -787,7 +771,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *model.Fingerprint)
m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value) m[model.LabelName(*v.Name)] = model.LabelValue(*v.Value)
} }
return return m, nil
} }
func (l LevelDBMetricPersistence) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values { func (l LevelDBMetricPersistence) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values {

View file

@ -14,14 +14,16 @@
package metric package metric
import ( import (
"code.google.com/p/goprotobuf/proto"
"fmt" "fmt"
"github.com/prometheus/prometheus/coding" "time"
"github.com/prometheus/prometheus/model"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"time"
) )
// processor models a post-processing agent that performs work given a sample // processor models a post-processing agent that performs work given a sample
@ -153,8 +155,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize: case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize:
if !keyDropped { if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true keyDropped = true
} }
pendingSamples = append(pendingSamples, unactedSamples...) pendingSamples = append(pendingSamples, unactedSamples...)
@ -165,15 +166,12 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
// If the number of pending writes equals the target group size // If the number of pending writes equals the target group size
case len(pendingSamples) == p.MinimumGroupSize: case len(pendingSamples) == p.MinimumGroupSize:
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewPBEncoder(newSampleKey.ToDTO()) pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO())
value := coding.NewPBEncoder(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingMutations++ pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
if len(unactedSamples) > 0 { if len(unactedSamples) > 0 {
if !keyDropped { if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true keyDropped = true
} }
@ -190,8 +188,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize: case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize:
if !keyDropped { if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true keyDropped = true
} }
remainder := p.MinimumGroupSize - len(pendingSamples) remainder := p.MinimumGroupSize - len(pendingSamples)
@ -211,9 +208,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
if len(unactedSamples) > 0 || len(pendingSamples) > 0 { if len(unactedSamples) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(pendingSamples, unactedSamples...) pendingSamples = append(pendingSamples, unactedSamples...)
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewPBEncoder(newSampleKey.ToDTO()) pendingBatch.Put(newSampleKey.ToDTO(), pendingSamples.ToDTO())
value := coding.NewPBEncoder(pendingSamples.ToDTO())
pendingBatch.Put(key, value)
pendingSamples = model.Values{} pendingSamples = model.Values{}
pendingMutations++ pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
@ -320,24 +315,20 @@ func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersist
pendingBatch = nil pendingBatch = nil
case !sampleKey.MayContain(stopAt): case !sampleKey.MayContain(stopAt):
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
lastCurated = sampleKey.LastTimestamp lastCurated = sampleKey.LastTimestamp
sampleValues = model.Values{} sampleValues = model.Values{}
pendingMutations++ pendingMutations++
case sampleKey.MayContain(stopAt): case sampleKey.MayContain(stopAt):
key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(sampleKey.ToDTO())
pendingBatch.Drop(key)
pendingMutations++ pendingMutations++
sampleValues = sampleValues.TruncateBefore(stopAt) sampleValues = sampleValues.TruncateBefore(stopAt)
if len(sampleValues) > 0 { if len(sampleValues) > 0 {
sampleKey = sampleValues.ToSampleKey(fingerprint) sampleKey = sampleValues.ToSampleKey(fingerprint)
lastCurated = sampleKey.FirstTimestamp lastCurated = sampleKey.FirstTimestamp
newKey := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Put(sampleKey.ToDTO(), sampleValues.ToDTO())
newValue := coding.NewPBEncoder(sampleValues.ToDTO())
pendingBatch.Put(newKey, newValue)
pendingMutations++ pendingMutations++
} else { } else {
lastCurated = sampleKey.LastTimestamp lastCurated = sampleKey.LastTimestamp

View file

@ -14,15 +14,17 @@
package metric package metric
import ( import (
"code.google.com/p/goprotobuf/proto"
"fmt" "fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw/leveldb"
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"testing" "testing"
"time" "time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
fixture "github.com/prometheus/prometheus/storage/raw/leveldb/test"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/raw/leveldb"
) )
type curationState struct { type curationState struct {
@ -56,40 +58,40 @@ type out struct {
sampleGroups []sampleGroup sampleGroups []sampleGroup
} }
func (c curationState) Get() (key, value coding.Encoder) { func (c curationState) Get() (key, value proto.Message) {
signature, err := c.processor.Signature() signature, err := c.processor.Signature()
if err != nil { if err != nil {
panic(err) panic(err)
} }
key = coding.NewPBEncoder(model.CurationKey{ key = model.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint), Fingerprint: model.NewFingerprintFromRowKey(c.fingerprint),
ProcessorMessageRaw: signature, ProcessorMessageRaw: signature,
ProcessorMessageTypeName: c.processor.Name(), ProcessorMessageTypeName: c.processor.Name(),
IgnoreYoungerThan: c.ignoreYoungerThan, IgnoreYoungerThan: c.ignoreYoungerThan,
}.ToDTO()) }.ToDTO()
value = coding.NewPBEncoder(model.CurationRemark{ value = model.CurationRemark{
LastCompletionTimestamp: c.lastCurated, LastCompletionTimestamp: c.lastCurated,
}.ToDTO()) }.ToDTO()
return return
} }
func (w watermarkState) Get() (key, value coding.Encoder) { func (w watermarkState) Get() (key, value proto.Message) {
key = coding.NewPBEncoder(model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()) key = model.NewFingerprintFromRowKey(w.fingerprint).ToDTO()
value = coding.NewPBEncoder(model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()) value = model.NewWatermarkFromTime(w.lastAppended).ToMetricHighWatermarkDTO()
return return
} }
func (s sampleGroup) Get() (key, value coding.Encoder) { func (s sampleGroup) Get() (key, value proto.Message) {
key = coding.NewPBEncoder(model.SampleKey{ key = model.SampleKey{
Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint), Fingerprint: model.NewFingerprintFromRowKey(s.fingerprint),
FirstTimestamp: s.values[0].Timestamp, FirstTimestamp: s.values[0].Timestamp,
LastTimestamp: s.values[len(s.values)-1].Timestamp, LastTimestamp: s.values[len(s.values)-1].Timestamp,
SampleCount: uint32(len(s.values)), SampleCount: uint32(len(s.values)),
}.ToDTO()) }.ToDTO()
value = coding.NewPBEncoder(s.values.ToDTO()) value = s.values.ToDTO()
return return
} }

View file

@ -19,8 +19,6 @@ import (
"sort" "sort"
"time" "time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
@ -341,18 +339,12 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e
wm, ok := t.wmCache.Get(f) wm, ok := t.wmCache.Get(f)
if !ok { if !ok {
rowKey := coding.NewPBEncoder(f.ToDTO())
raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey)
if err != nil {
return false, err
}
if raw != nil {
value := &dto.MetricHighWatermark{} value := &dto.MetricHighWatermark{}
err = proto.Unmarshal(raw, value) present, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value)
if err != nil { if err != nil {
return false, err return false, err
} }
if present {
wmTime := time.Unix(*value.Timestamp, 0).UTC() wmTime := time.Unix(*value.Timestamp, 0).UTC()
t.wmCache.Set(f, &Watermarks{High: wmTime}) t.wmCache.Set(f, &Watermarks{High: wmTime})
return wmTime.Before(i), nil return wmTime.Before(i), nil

View file

@ -13,13 +13,11 @@
package index package index
import ( import "code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding"
)
type MembershipIndex interface { type MembershipIndex interface {
Has(key coding.Encoder) (bool, error) Has(key proto.Message) (bool, error)
Put(key coding.Encoder) error Put(key proto.Message) error
Drop(key coding.Encoder) error Drop(key proto.Message) error
Close() Close()
} }

View file

@ -14,20 +14,15 @@
package leveldb package leveldb
import ( import (
"github.com/prometheus/prometheus/coding" "code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
) )
type indexValue struct{} var existenceValue = &dto.MembershipIndexValue{}
func (i *indexValue) MustEncode() []byte {
return []byte{}
}
var (
existenceValue = &indexValue{}
)
type LevelDBMembershipIndex struct { type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence persistence *leveldb.LevelDBPersistence
@ -37,16 +32,16 @@ func (l *LevelDBMembershipIndex) Close() {
l.persistence.Close() l.persistence.Close()
} }
func (l *LevelDBMembershipIndex) Has(key coding.Encoder) (bool, error) { func (l *LevelDBMembershipIndex) Has(k proto.Message) (bool, error) {
return l.persistence.Has(key) return l.persistence.Has(k)
} }
func (l *LevelDBMembershipIndex) Drop(key coding.Encoder) error { func (l *LevelDBMembershipIndex) Drop(k proto.Message) error {
return l.persistence.Drop(key) return l.persistence.Drop(k)
} }
func (l *LevelDBMembershipIndex) Put(key coding.Encoder) error { func (l *LevelDBMembershipIndex) Put(k proto.Message) error {
return l.persistence.Put(key, existenceValue) return l.persistence.Put(k, existenceValue)
} }
func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) { func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) {

View file

@ -14,7 +14,8 @@
package raw package raw
import ( import (
"github.com/prometheus/prometheus/coding" "code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
@ -25,14 +26,14 @@ type Persistence interface {
// persistence. // persistence.
Close() Close()
// Has informs the user whether a given key exists in the database. // Has informs the user whether a given key exists in the database.
Has(key coding.Encoder) (bool, error) Has(key proto.Message) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if // Get retrieves the key from the database if it exists or returns nil if
// it is absent. // it is absent.
Get(key coding.Encoder) ([]byte, error) Get(key, value proto.Message) (present bool, err error)
// Drop removes the key from the database. // Drop removes the key from the database.
Drop(key coding.Encoder) error Drop(key proto.Message) error
// Put sets the key to a given value. // Put sets the key to a given value.
Put(key, value coding.Encoder) error Put(key, value proto.Message) error
// ForEach is responsible for iterating through all records in the database // ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met: // until one of the following conditions are met:
// //
@ -41,7 +42,7 @@ type Persistence interface {
// 3.) A FilterResult of STOP is emitted by the Filter. // 3.) A FilterResult of STOP is emitted by the Filter.
// //
// Decoding errors for an entity cause that entity to be skipped. // Decoding errors for an entity cause that entity to be skipped.
ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
// Commit applies the Batch operations to the database. // Commit applies the Batch operations to the database.
Commit(Batch) error Commit(Batch) error
} }
@ -54,7 +55,7 @@ type Batch interface {
// batch mutation. // batch mutation.
Close() Close()
// Put follows the same protocol as Persistence.Put. // Put follows the same protocol as Persistence.Put.
Put(key, value coding.Encoder) Put(key, value proto.Message)
// Drop follows the same protocol as Persistence.Drop. // Drop follows the same protocol as Persistence.Drop.
Drop(key coding.Encoder) Drop(key proto.Message)
} }

View file

@ -15,7 +15,10 @@ package leveldb
import ( import (
"fmt" "fmt"
"code.google.com/p/goprotobuf/proto"
"github.com/jmhodges/levigo" "github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
) )
@ -31,25 +34,23 @@ func NewBatch() *batch {
} }
} }
func (b *batch) Drop(key coding.Encoder) { func (b *batch) Drop(key proto.Message) {
keyEncoded := key.MustEncode() b.batch.Delete(coding.NewPBEncoder(key).MustEncode())
b.drops++
b.batch.Delete(keyEncoded) b.drops++
} }
func (b *batch) Put(key, value coding.Encoder) { func (b *batch) Put(key, value proto.Message) {
keyEncoded := key.MustEncode() b.batch.Put(coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
valueEncoded := value.MustEncode()
b.puts++ b.puts++
b.batch.Put(keyEncoded, valueEncoded)
} }
func (b batch) Close() { func (b *batch) Close() {
b.batch.Close() b.batch.Close()
} }
func (b batch) String() string { func (b *batch) String() string {
return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops) return fmt.Sprintf("LevelDB batch with %d puts and %d drops.", b.puts, b.drops)
} }

View file

@ -16,11 +16,14 @@ package leveldb
import ( import (
"flag" "flag"
"fmt" "fmt"
"time"
"code.google.com/p/goprotobuf/proto"
"github.com/jmhodges/levigo" "github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"time"
) )
var ( var (
@ -250,38 +253,37 @@ func (l *LevelDBPersistence) Close() {
return return
} }
func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) { func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
key := value.MustEncode() raw, err := l.storage.Get(l.readOptions, coding.NewPBEncoder(k).MustEncode())
return l.storage.Get(l.readOptions, key)
}
func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) {
raw, err := l.Get(value)
if err != nil { if err != nil {
return return false, err
}
if raw == nil {
return false, nil
} }
h = raw != nil if v == nil {
return true, nil
}
return err = proto.Unmarshal(raw, v)
if err != nil {
return true, err
}
return true, nil
} }
func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) { func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) {
key := value.MustEncode() return l.Get(k, nil)
err = l.storage.Delete(l.writeOptions, key)
return
} }
func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) { func (l *LevelDBPersistence) Drop(k proto.Message) error {
keyEncoded := key.MustEncode() return l.storage.Delete(l.writeOptions, coding.NewPBEncoder(k).MustEncode())
}
valueEncoded := value.MustEncode() func (l *LevelDBPersistence) Put(key, value proto.Message) error {
return l.storage.Put(l.writeOptions, coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded)
return
} }
func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {

View file

@ -14,7 +14,8 @@
package test package test
import ( import (
"github.com/prometheus/prometheus/coding" "code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
) )
@ -28,7 +29,7 @@ type (
// Pair models a prospective (key, value) double that will be committed to // Pair models a prospective (key, value) double that will be committed to
// a database. // a database.
Pair interface { Pair interface {
Get() (key, value coding.Encoder) Get() (key, value proto.Message)
} }
// Pairs models a list of Pair for disk committing. // Pairs models a list of Pair for disk committing.
@ -47,7 +48,7 @@ type (
// data to build. // data to build.
HasNext() (has bool) HasNext() (has bool)
// Next emits the next (key, value) double for storage. // Next emits the next (key, value) double for storage.
Next() (key coding.Encoder, value coding.Encoder) Next() (key, value proto.Message)
} }
preparer struct { preparer struct {
@ -88,7 +89,7 @@ func (f cassetteFactory) HasNext() bool {
return f.index < f.count return f.index < f.count
} }
func (f *cassetteFactory) Next() (key, value coding.Encoder) { func (f *cassetteFactory) Next() (key, value proto.Message) {
key, value = f.pairs[f.index].Get() key, value = f.pairs[f.index].Get()
f.index++ f.index++

View file

@ -19,17 +19,20 @@
package main package main
import ( import (
"code.google.com/p/goprotobuf/proto"
"encoding/csv" "encoding/csv"
"flag" "flag"
"fmt" "fmt"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
"log" "log"
"os" "os"
"strconv" "strconv"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
) )
var ( var (

View file

@ -102,7 +102,6 @@ func getEmbeddedTemplate(name string) (*template.Template, error) {
return t, nil return t, nil
} }
func getTemplate(name string) (t *template.Template, err error) { func getTemplate(name string) (t *template.Template, err error) {
if *useLocalAssets { if *useLocalAssets {
t, err = getLocalTemplate(name) t, err = getLocalTemplate(name)