Merge "Major code cleanup in storage."

This commit is contained in:
Julius Volz 2014-02-27 18:04:05 +01:00 committed by Gerrit Code Review
commit 24a0ccdd54
33 changed files with 602 additions and 670 deletions

View file

@ -27,34 +27,34 @@ type FilterResult int
const ( const (
// Stop scanning the database. // Stop scanning the database.
STOP FilterResult = iota Stop FilterResult = iota
// Skip this record but continue scanning. // Skip this record but continue scanning.
SKIP Skip
// Accept this record for the Operator. // Accept this record for the Operator.
ACCEPT Accept
) )
func (f FilterResult) String() string { func (f FilterResult) String() string {
switch f { switch f {
case STOP: case Stop:
return "STOP" return "STOP"
case SKIP: case Skip:
return "SKIP" return "SKIP"
case ACCEPT: case Accept:
return "ACCEPT" return "ACCEPT"
} }
panic("unknown") panic("unknown")
} }
type OperatorErrorType int // OperatorError is used for storage operations upon errors that may or may not
// be continuable.
type OperatorError struct { type OperatorError struct {
Error error Error error
Continuable bool Continuable bool
} }
// Filter is responsible for controlling the behavior of the database scan // RecordFilter is responsible for controlling the behavior of the database scan
// process and determines the disposition of various records. // process and determines the disposition of various records.
// //
// The protocol around it makes the assumption that the underlying // The protocol around it makes the assumption that the underlying

View file

@ -60,7 +60,7 @@ func (c *compactionChecker) Operate(key, value interface{}) *storage.OperatorErr
if sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) { if sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) {
c.t.Fatalf("Chunk FirstTimestamp (%v) is after LastTimestamp (%v): %v", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey) c.t.Fatalf("Chunk FirstTimestamp (%v) is after LastTimestamp (%v): %v", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey)
} }
fp := new(clientmodel.Fingerprint) fp := &clientmodel.Fingerprint{}
for _, sample := range value.(Values) { for _, sample := range value.(Values) {
if sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp) { if sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp) {
c.t.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp) c.t.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp)

View file

@ -34,7 +34,7 @@ import (
const curationYieldPeriod = 250 * time.Millisecond const curationYieldPeriod = 250 * time.Millisecond
var errIllegalIterator = errors.New("Iterator invalid.") var errIllegalIterator = errors.New("iterator invalid")
// CurationStateUpdater receives updates about the curation state. // CurationStateUpdater receives updates about the curation state.
type CurationStateUpdater interface { type CurationStateUpdater interface {
@ -50,6 +50,7 @@ type CurationState struct {
Fingerprint *clientmodel.Fingerprint Fingerprint *clientmodel.Fingerprint
} }
// CuratorOptions bundles the parameters needed to create a Curator.
type CuratorOptions struct { type CuratorOptions struct {
// Stop functions as a channel that when empty allows the curator to operate. // Stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain // The moment a value is ingested inside of it, the curator goes into drain
@ -59,7 +60,7 @@ type CuratorOptions struct {
ViewQueue chan viewJob ViewQueue chan viewJob
} }
// curator is responsible for effectuating a given curation policy across the // Curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into // stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore. // single sample entities to reduce keyspace load on the datastore.
type Curator struct { type Curator struct {
@ -71,6 +72,7 @@ type Curator struct {
sampleKeys *sampleKeyList sampleKeys *sampleKeyList
} }
// NewCurator returns an initialized Curator.
func NewCurator(o *CuratorOptions) *Curator { func NewCurator(o *CuratorOptions) *Curator {
return &Curator{ return &Curator{
stop: o.Stop, stop: o.Stop,
@ -122,7 +124,7 @@ type watermarkScanner struct {
sampleKeys *sampleKeyList sampleKeys *sampleKeyList
} }
// run facilitates the curation lifecycle. // Run facilitates the curation lifecycle.
// //
// recencyThreshold represents the most recent time up to which values will be // recencyThreshold represents the most recent time up to which values will be
// curated. // curated.
@ -214,7 +216,7 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times
return return
} }
// drain instructs the curator to stop at the next convenient moment as to not // Drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies. // introduce data inconsistencies.
func (c *Curator) Drain() { func (c *Curator) Drain() {
if len(c.stop) == 0 { if len(c.stop) == 0 {
@ -222,34 +224,35 @@ func (c *Curator) Drain() {
} }
} }
// Close needs to be called to cleanly dispose of a curator.
func (c *Curator) Close() { func (c *Curator) Close() {
c.dtoSampleKeys.Close() c.dtoSampleKeys.Close()
c.sampleKeys.Close() c.sampleKeys.Close()
} }
func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) { func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) {
key := new(dto.Fingerprint) key := &dto.Fingerprint{}
bytes := in.([]byte) bytes := in.([]byte)
if err := proto.Unmarshal(bytes, key); err != nil { if err := proto.Unmarshal(bytes, key); err != nil {
return nil, err return nil, err
} }
fingerprint := new(clientmodel.Fingerprint) fingerprint := &clientmodel.Fingerprint{}
loadFingerprint(fingerprint, key) loadFingerprint(fingerprint, key)
return fingerprint, nil return fingerprint, nil
} }
func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) { func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) {
value := new(dto.MetricHighWatermark) value := &dto.MetricHighWatermark{}
bytes := in.([]byte) bytes := in.([]byte)
if err := proto.Unmarshal(bytes, value); err != nil { if err := proto.Unmarshal(bytes, value); err != nil {
return nil, err return nil, err
} }
watermark := new(watermarks) watermark := &watermarks{}
watermark.load(value) watermark.load(value)
return watermark, nil return watermark, nil
@ -280,7 +283,7 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
}() }()
if w.shouldStop() { if w.shouldStop() {
return storage.STOP return storage.Stop
} }
k := &curationKey{ k := &curationKey{
@ -295,24 +298,24 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
return return
} }
if !present { if !present {
return storage.ACCEPT return storage.Accept
} }
if !curationRemark.Before(w.stopAt) { if !curationRemark.Before(w.stopAt) {
return storage.SKIP return storage.Skip
} }
watermark := value.(*watermarks) watermark := value.(*watermarks)
if !curationRemark.Before(watermark.High) { if !curationRemark.Before(watermark.High) {
return storage.SKIP return storage.Skip
} }
curationConsistent, err := w.curationConsistent(fingerprint, watermark) curationConsistent, err := w.curationConsistent(fingerprint, watermark)
if err != nil { if err != nil {
return return
} }
if curationConsistent { if curationConsistent {
return storage.SKIP return storage.Skip
} }
return storage.ACCEPT return storage.Accept
} }
// curationConsistent determines whether the given metric is in a dirty state // curationConsistent determines whether the given metric is in a dirty state

View file

@ -272,8 +272,9 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
} }
} }
if true { v, ok := p.(View)
// XXX: Purely a benchmark. if !ok {
// It's purely a benchmark for a MetricPersistence that is not viewable.
return return
} }
@ -294,7 +295,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
} }
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
samples := p.GetValueAtTime(fingerprints[0], time) samples := v.GetValueAtTime(fingerprints[0], time)
if len(samples) == 0 { if len(samples) == 0 {
t.Fatal("expected at least one sample.") t.Fatal("expected at least one sample.")
} }
@ -303,7 +304,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
for _, sample := range samples { for _, sample := range samples {
if sample.Value != expected { if sample.Value != expected {
t.Fatalf("expected %d value, got %d", expected, sample.Value) t.Fatalf("expected %v value, got %v", expected, sample.Value)
} }
} }
} }
@ -334,8 +335,9 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
p.AppendSamples(s) p.AppendSamples(s)
if true { v, ok := p.(View)
// XXX: Purely a benchmark. if !ok {
// It's purely a benchmark for a MetricPersistance that is not viewable.
return return
} }
@ -356,7 +358,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
} }
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
samples := p.GetValueAtTime(fingerprints[0], time) samples := v.GetValueAtTime(fingerprints[0], time)
if len(samples) == 0 { if len(samples) == 0 {
t.Fatal("expected at least one sample.") t.Fatal("expected at least one sample.")
} }
@ -365,7 +367,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
for _, sample := range samples { for _, sample := range samples {
if sample.Value != expected { if sample.Value != expected {
t.Fatalf("expected %d value, got %d", expected, sample.Value) t.Fatalf("expected %v value, got %v", expected, sample.Value)
} }
} }
} }

View file

@ -34,7 +34,7 @@ func (l *dtoSampleKeyList) Get() (*dto.SampleKey, bool) {
return v.(*dto.SampleKey), ok return v.(*dto.SampleKey), ok
} }
return new(dto.SampleKey), false return &dto.SampleKey{}, false
} }
func (l *dtoSampleKeyList) Give(v *dto.SampleKey) bool { func (l *dtoSampleKeyList) Give(v *dto.SampleKey) bool {
@ -51,7 +51,7 @@ type sampleKeyList struct {
l utility.FreeList l utility.FreeList
} }
var defaultSampleKey = new(SampleKey) var defaultSampleKey = &SampleKey{}
func newSampleKeyList(cap int) *sampleKeyList { func newSampleKeyList(cap int) *sampleKeyList {
return &sampleKeyList{ return &sampleKeyList{
@ -64,7 +64,7 @@ func (l *sampleKeyList) Get() (*SampleKey, bool) {
return v.(*SampleKey), ok return v.(*SampleKey), ok
} }
return new(SampleKey), false return &SampleKey{}, false
} }
func (l *sampleKeyList) Give(v *SampleKey) bool { func (l *sampleKeyList) Give(v *SampleKey) bool {
@ -86,10 +86,10 @@ func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) {
return v.(*getValuesAtTimeOp), ok return v.(*getValuesAtTimeOp), ok
} }
return new(getValuesAtTimeOp), false return &getValuesAtTimeOp{}, false
} }
var pGetValuesAtTimeOp = new(getValuesAtTimeOp) var pGetValuesAtTimeOp = &getValuesAtTimeOp{}
func (l *valueAtTimeList) Give(v *getValuesAtTimeOp) bool { func (l *valueAtTimeList) Give(v *getValuesAtTimeOp) bool {
*v = *pGetValuesAtTimeOp *v = *pGetValuesAtTimeOp
@ -112,10 +112,10 @@ func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) {
return v.(*getValuesAtIntervalOp), ok return v.(*getValuesAtIntervalOp), ok
} }
return new(getValuesAtIntervalOp), false return &getValuesAtIntervalOp{}, false
} }
var pGetValuesAtIntervalOp = new(getValuesAtIntervalOp) var pGetValuesAtIntervalOp = &getValuesAtIntervalOp{}
func (l *valueAtIntervalList) Give(v *getValuesAtIntervalOp) bool { func (l *valueAtIntervalList) Give(v *getValuesAtIntervalOp) bool {
*v = *pGetValuesAtIntervalOp *v = *pGetValuesAtIntervalOp
@ -138,10 +138,10 @@ func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) {
return v.(*getValuesAlongRangeOp), ok return v.(*getValuesAlongRangeOp), ok
} }
return new(getValuesAlongRangeOp), false return &getValuesAlongRangeOp{}, false
} }
var pGetValuesAlongRangeOp = new(getValuesAlongRangeOp) var pGetValuesAlongRangeOp = &getValuesAlongRangeOp{}
func (l *valueAlongRangeList) Give(v *getValuesAlongRangeOp) bool { func (l *valueAlongRangeList) Give(v *getValuesAlongRangeOp) bool {
*v = *pGetValuesAlongRangeOp *v = *pGetValuesAlongRangeOp
@ -164,10 +164,10 @@ func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool)
return v.(*getValueRangeAtIntervalOp), ok return v.(*getValueRangeAtIntervalOp), ok
} }
return new(getValueRangeAtIntervalOp), false return &getValueRangeAtIntervalOp{}, false
} }
var pGetValueRangeAtIntervalOp = new(getValueRangeAtIntervalOp) var pGetValueRangeAtIntervalOp = &getValueRangeAtIntervalOp{}
func (l *valueAtIntervalAlongRangeList) Give(v *getValueRangeAtIntervalOp) bool { func (l *valueAtIntervalAlongRangeList) Give(v *getValueRangeAtIntervalOp) bool {
*v = *pGetValueRangeAtIntervalOp *v = *pGetValueRangeAtIntervalOp

View file

@ -21,8 +21,6 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
@ -30,59 +28,52 @@ import (
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
) )
// FingerprintMetricMapping is an in-memory map of Fingerprints to Metrics.
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
// FingerprintMetricIndex models a database mapping Fingerprints to Metrics.
type FingerprintMetricIndex interface { type FingerprintMetricIndex interface {
raw.Database
raw.Pruner raw.Pruner
IndexBatch(FingerprintMetricMapping) error IndexBatch(FingerprintMetricMapping) error
Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
} }
// LevelDBFingerprintMetricIndex implements FingerprintMetricIndex using
// leveldb.
type LevelDBFingerprintMetricIndex struct { type LevelDBFingerprintMetricIndex struct {
p *leveldb.LevelDBPersistence *leveldb.LevelDBPersistence
} }
// LevelDBFingerprintMetricIndexOptions just wraps leveldb.LevelDBOptions.
type LevelDBFingerprintMetricIndexOptions struct { type LevelDBFingerprintMetricIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func (i *LevelDBFingerprintMetricIndex) Close() { // IndexBatch implements FingerprintMetricIndex.
i.p.Close()
}
func (i *LevelDBFingerprintMetricIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LevelDBFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { func (i *LevelDBFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch() b := leveldb.NewBatch()
defer b.Close() defer b.Close()
for f, m := range mapping { for f, m := range mapping {
k := new(dto.Fingerprint) k := &dto.Fingerprint{}
dumpFingerprint(k, &f) dumpFingerprint(k, &f)
v := new(dto.Metric) v := &dto.Metric{}
dumpMetric(v, m) dumpMetric(v, m)
b.Put(k, v) b.Put(k, v)
} }
return i.p.Commit(b) return i.LevelDBPersistence.Commit(b)
} }
// Lookup implements FingerprintMetricIndex.
func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
k := new(dto.Fingerprint) k := &dto.Fingerprint{}
dumpFingerprint(k, f) dumpFingerprint(k, f)
v := new(dto.Metric) v := &dto.Metric{}
if ok, err := i.p.Get(k, v); !ok { if ok, err := i.LevelDBPersistence.Get(k, v); !ok {
return nil, false, nil return nil, false, nil
} else if err != nil { } else if err != nil {
return nil, false, err return nil, false, err
@ -97,12 +88,8 @@ func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m cl
return m, true, nil return m, true, nil
} }
func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) { // NewLevelDBFingerprintMetricIndex returns a LevelDBFingerprintMetricIndex
i.p.Prune() // object ready to use.
return false, nil
}
func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) { func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
@ -110,26 +97,32 @@ func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*
} }
return &LevelDBFingerprintMetricIndex{ return &LevelDBFingerprintMetricIndex{
p: s, LevelDBPersistence: s,
}, nil }, nil
} }
// LabelNameFingerprintMapping is an in-memory map of LabelNames to
// Fingerprints.
type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints
// LabelNameFingerprintIndex models a database mapping LabelNames to
// Fingerprints.
type LabelNameFingerprintIndex interface { type LabelNameFingerprintIndex interface {
raw.Database
raw.Pruner raw.Pruner
IndexBatch(LabelNameFingerprintMapping) error IndexBatch(LabelNameFingerprintMapping) error
Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error)
Has(clientmodel.LabelName) (ok bool, err error) Has(clientmodel.LabelName) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
} }
// LevelDBLabelNameFingerprintIndex implements LabelNameFingerprintIndex using
// leveldb.
type LevelDBLabelNameFingerprintIndex struct { type LevelDBLabelNameFingerprintIndex struct {
p *leveldb.LevelDBPersistence *leveldb.LevelDBPersistence
} }
// IndexBatch implements LabelNameFingerprintIndex.
func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error { func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -140,9 +133,9 @@ func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp
key := &dto.LabelName{ key := &dto.LabelName{
Name: proto.String(string(labelName)), Name: proto.String(string(labelName)),
} }
value := new(dto.FingerprintCollection) value := &dto.FingerprintCollection{}
for _, fingerprint := range fingerprints { for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint) f := &dto.Fingerprint{}
dumpFingerprint(f, fingerprint) dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f) value.Member = append(value.Member, f)
} }
@ -150,14 +143,15 @@ func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp
batch.Put(key, value) batch.Put(key, value)
} }
return i.p.Commit(batch) return i.LevelDBPersistence.Commit(batch)
} }
// Lookup implements LabelNameFingerprintIndex.
func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) { func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
k := new(dto.LabelName) k := &dto.LabelName{}
dumpLabelName(k, l) dumpLabelName(k, l)
v := new(dto.FingerprintCollection) v := &dto.FingerprintCollection{}
ok, err = i.p.Get(k, v) ok, err = i.LevelDBPersistence.Get(k, v)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -166,7 +160,7 @@ func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps
} }
for _, m := range v.Member { for _, m := range v.Member {
fp := new(clientmodel.Fingerprint) fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, m) loadFingerprint(fp, m)
fps = append(fps, fp) fps = append(fps, fp)
} }
@ -174,35 +168,20 @@ func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps
return fps, true, nil return fps, true, nil
} }
// Has implements LabelNameFingerprintIndex.
func (i *LevelDBLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) { func (i *LevelDBLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.p.Has(&dto.LabelName{ return i.LevelDBPersistence.Has(&dto.LabelName{
Name: proto.String(string(l)), Name: proto.String(string(l)),
}) })
} }
func (i *LevelDBLabelNameFingerprintIndex) Prune() (bool, error) { // LevelDBLabelNameFingerprintIndexOptions just wraps leveldb.LevelDBOptions.
i.p.Prune()
return false, nil
}
func (i *LevelDBLabelNameFingerprintIndex) Close() {
i.p.Close()
}
func (i *LevelDBLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBLabelNameFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
}
type LevelDBLabelNameFingerprintIndexOptions struct { type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
// NewLevelLabelNameFingerprintIndex returns a LevelDBLabelNameFingerprintIndex
// ready to use.
func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) { func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
@ -210,31 +189,38 @@ func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions
} }
return &LevelDBLabelNameFingerprintIndex{ return &LevelDBLabelNameFingerprintIndex{
p: s, LevelDBPersistence: s,
}, nil }, nil
} }
// LabelPairFingerprintMapping is an in-memory map of LabelPairs to
// Fingerprints.
type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints
// LabelPairFingerprintIndex models a database mapping LabelPairs to
// Fingerprints.
type LabelPairFingerprintIndex interface { type LabelPairFingerprintIndex interface {
raw.Database
raw.ForEacher raw.ForEacher
raw.Pruner raw.Pruner
IndexBatch(LabelPairFingerprintMapping) error IndexBatch(LabelPairFingerprintMapping) error
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error) Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error) Has(*LabelPair) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
} }
// LevelDBLabelPairFingerprintIndex implements LabelPairFingerprintIndex using
// leveldb.
type LevelDBLabelPairFingerprintIndex struct { type LevelDBLabelPairFingerprintIndex struct {
p *leveldb.LevelDBPersistence *leveldb.LevelDBPersistence
} }
// LevelDBLabelSetFingerprintIndexOptions just wraps leveldb.LevelDBOptions.
type LevelDBLabelSetFingerprintIndexOptions struct { type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
// IndexBatch implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapping) error { func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -246,9 +232,9 @@ func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapp
Name: proto.String(string(pair.Name)), Name: proto.String(string(pair.Name)),
Value: proto.String(string(pair.Value)), Value: proto.String(string(pair.Value)),
} }
value := new(dto.FingerprintCollection) value := &dto.FingerprintCollection{}
for _, fp := range fps { for _, fp := range fps {
f := new(dto.Fingerprint) f := &dto.Fingerprint{}
dumpFingerprint(f, fp) dumpFingerprint(f, fp)
value.Member = append(value.Member, f) value.Member = append(value.Member, f)
} }
@ -256,17 +242,18 @@ func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapp
batch.Put(key, value) batch.Put(key, value)
} }
return i.p.Commit(batch) return i.LevelDBPersistence.Commit(batch)
} }
// Lookup implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) { func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{ k := &dto.LabelPair{
Name: proto.String(string(p.Name)), Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)), Value: proto.String(string(p.Value)),
} }
v := new(dto.FingerprintCollection) v := &dto.FingerprintCollection{}
ok, err = i.p.Get(k, v) ok, err = i.LevelDBPersistence.Get(k, v)
if !ok { if !ok {
return nil, false, nil return nil, false, nil
@ -276,7 +263,7 @@ func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.F
} }
for _, pair := range v.Member { for _, pair := range v.Member {
fp := new(clientmodel.Fingerprint) fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, pair) loadFingerprint(fp, pair)
m = append(m, fp) m = append(m, fp)
} }
@ -284,37 +271,18 @@ func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.F
return m, true, nil return m, true, nil
} }
// Has implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) Has(p *LabelPair) (ok bool, err error) { func (i *LevelDBLabelPairFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{ k := &dto.LabelPair{
Name: proto.String(string(p.Name)), Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)), Value: proto.String(string(p.Value)),
} }
return i.p.Has(k) return i.LevelDBPersistence.Has(k)
}
func (i *LevelDBLabelPairFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *LevelDBLabelPairFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LevelDBLabelPairFingerprintIndex) Close() {
i.p.Close()
}
func (i *LevelDBLabelPairFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBLabelPairFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
} }
// NewLevelDBLabelSetFingerprintIndex returns a LevelDBLabelPairFingerprintIndex
// object ready to use.
func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) { func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
@ -322,68 +290,55 @@ func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions
} }
return &LevelDBLabelPairFingerprintIndex{ return &LevelDBLabelPairFingerprintIndex{
p: s, LevelDBPersistence: s,
}, nil }, nil
} }
// MetricMembershipIndex models a database tracking the existence of Metrics.
type MetricMembershipIndex interface { type MetricMembershipIndex interface {
raw.Database
raw.Pruner raw.Pruner
IndexBatch(FingerprintMetricMapping) error IndexBatch(FingerprintMetricMapping) error
Has(clientmodel.Metric) (ok bool, err error) Has(clientmodel.Metric) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
} }
// LevelDBMetricMembershipIndex implements MetricMembershipIndex using leveldb.
type LevelDBMetricMembershipIndex struct { type LevelDBMetricMembershipIndex struct {
p *leveldb.LevelDBPersistence *leveldb.LevelDBPersistence
} }
var existenceIdentity = new(dto.MembershipIndexValue) var existenceIdentity = &dto.MembershipIndexValue{}
// IndexBatch implements MetricMembershipIndex.
func (i *LevelDBMetricMembershipIndex) IndexBatch(b FingerprintMetricMapping) error { func (i *LevelDBMetricMembershipIndex) IndexBatch(b FingerprintMetricMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
for _, m := range b { for _, m := range b {
k := new(dto.Metric) k := &dto.Metric{}
dumpMetric(k, m) dumpMetric(k, m)
batch.Put(k, existenceIdentity) batch.Put(k, existenceIdentity)
} }
return i.p.Commit(batch) return i.LevelDBPersistence.Commit(batch)
} }
// Has implements MetricMembershipIndex.
func (i *LevelDBMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) { func (i *LevelDBMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
k := new(dto.Metric) k := &dto.Metric{}
dumpMetric(k, m) dumpMetric(k, m)
return i.p.Has(k) return i.LevelDBPersistence.Has(k)
}
func (i *LevelDBMetricMembershipIndex) Close() {
i.p.Close()
}
func (i *LevelDBMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBMetricMembershipIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LevelDBMetricMembershipIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
} }
// LevelDBMetricMembershipIndexOptions just wraps leveldb.LevelDBOptions
type LevelDBMetricMembershipIndexOptions struct { type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
// NewLevelDBMetricMembershipIndex returns a LevelDBMetricMembershipIndex object
// ready to use.
func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) { func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
@ -391,7 +346,7 @@ func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*Le
} }
return &LevelDBMetricMembershipIndex{ return &LevelDBMetricMembershipIndex{
p: s, LevelDBPersistence: s,
}, nil }, nil
} }
@ -402,7 +357,7 @@ type MetricIndexer interface {
IndexMetrics(FingerprintMetricMapping) error IndexMetrics(FingerprintMetricMapping) error
} }
// IndexObserver listens and receives changes to a given // IndexerObserver listens and receives changes to a given
// FingerprintMetricMapping. // FingerprintMetricMapping.
type IndexerObserver interface { type IndexerObserver interface {
Observe(FingerprintMetricMapping) error Observe(FingerprintMetricMapping) error
@ -422,6 +377,8 @@ type IndexerProxy struct {
observers []IndexerObserver observers []IndexerObserver
} }
// IndexMetrics proxies the given FingerprintMetricMapping to the underlying
// MetricIndexer and calls all registered observers with it.
func (p *IndexerProxy) IndexMetrics(b FingerprintMetricMapping) error { func (p *IndexerProxy) IndexMetrics(b FingerprintMetricMapping) error {
if p.err != nil { if p.err != nil {
return p.err return p.err
@ -451,7 +408,7 @@ func (p *IndexerProxy) Close() error {
return nil return nil
} }
// Close flushes the underlying index requests before closing. // Flush flushes the underlying index requests before closing.
func (p *IndexerProxy) Flush() error { func (p *IndexerProxy) Flush() error {
if p.err != nil { if p.err != nil {
return p.err return p.err
@ -477,6 +434,8 @@ type SynchronizedIndexer struct {
i MetricIndexer i MetricIndexer
} }
// IndexMetrics calls IndexMetrics of the wrapped MetricIndexer after acquiring
// a lock.
func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error { func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
i.mu.Lock() i.mu.Lock()
defer i.mu.Unlock() defer i.mu.Unlock()
@ -488,6 +447,8 @@ type flusher interface {
Flush() error Flush() error
} }
// Flush calls Flush of the wrapped MetricIndexer after acquiring a lock. If the
// wrapped MetricIndexer has no Flush method, this is a no-op.
func (i *SynchronizedIndexer) Flush() error { func (i *SynchronizedIndexer) Flush() error {
if flusher, ok := i.i.(flusher); ok { if flusher, ok := i.i.(flusher); ok {
i.mu.Lock() i.mu.Lock()
@ -499,6 +460,8 @@ func (i *SynchronizedIndexer) Flush() error {
return nil return nil
} }
// Close calls Close of the wrapped MetricIndexer after acquiring a lock. If the
// wrapped MetricIndexer has no Close method, this is a no-op.
func (i *SynchronizedIndexer) Close() error { func (i *SynchronizedIndexer) Close() error {
if closer, ok := i.i.(io.Closer); ok { if closer, ok := i.i.(io.Closer); ok {
i.mu.Lock() i.mu.Lock()
@ -510,7 +473,8 @@ func (i *SynchronizedIndexer) Close() error {
return nil return nil
} }
// NewSynchronizedIndexer builds a new MetricIndexer. // NewSynchronizedIndexer returns a SynchronizedIndexer wrapping the given
// MetricIndexer.
func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer { func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer {
return &SynchronizedIndexer{ return &SynchronizedIndexer{
i: i, i: i,
@ -531,6 +495,8 @@ type BufferedIndexer struct {
err error err error
} }
// IndexMetrics writes the entries in the given FingerprintMetricMapping to the
// index.
func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error { func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
if i.err != nil { if i.err != nil {
return i.err return i.err
@ -542,8 +508,6 @@ func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
return nil return nil
} }
i.buf = append(i.buf)
i.err = i.Flush() i.err = i.Flush()
return i.err return i.err
@ -590,8 +554,8 @@ func (i *BufferedIndexer) Close() error {
return nil return nil
} }
// NewBufferedIndexer returns a BufferedIndexer ready to use.
func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer { func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer {
return &BufferedIndexer{ return &BufferedIndexer{
i: i, i: i,
limit: limit, limit: limit,
@ -715,6 +679,8 @@ func extendLabelPairIndex(i LabelPairFingerprintIndex, b FingerprintMetricMappin
return batch, nil return batch, nil
} }
// IndexMetrics adds the facets of all unindexed metrics found in the given
// FingerprintMetricMapping to the corresponding indices.
func (i *TotalIndexer) IndexMetrics(b FingerprintMetricMapping) error { func (i *TotalIndexer) IndexMetrics(b FingerprintMetricMapping) error {
unindexed, err := findUnindexed(i.MetricMembership, b) unindexed, err := findUnindexed(i.MetricMembership, b)
if err != nil { if err != nil {

View file

@ -13,11 +13,7 @@
package metric package metric
import ( import clientmodel "github.com/prometheus/client_golang/model"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
)
// AppendBatch models a batch of samples to be stored. // AppendBatch models a batch of samples to be stored.
type AppendBatch map[clientmodel.Fingerprint]SampleSet type AppendBatch map[clientmodel.Fingerprint]SampleSet
@ -32,24 +28,17 @@ type MetricPersistence interface {
// Record a group of new samples in the storage layer. // Record a group of new samples in the storage layer.
AppendSamples(clientmodel.Samples) error AppendSamples(clientmodel.Samples) error
// Get all of the metric fingerprints that are associated with the provided // Get all of the metric fingerprints that are associated with the
// label set. // provided label set.
GetFingerprintsForLabelSet(clientmodel.LabelSet) (clientmodel.Fingerprints, error) GetFingerprintsForLabelSet(clientmodel.LabelSet) (clientmodel.Fingerprints, error)
// Get all of the metric fingerprints that are associated for a given label // Get all of the metric fingerprints that are associated for a given
// name. // label name.
GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, error) GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, error)
// Get the metric associated with the provided fingerprint. // Get the metric associated with the provided fingerprint.
GetMetricForFingerprint(*clientmodel.Fingerprint) (clientmodel.Metric, error) GetMetricForFingerprint(*clientmodel.Fingerprint) (clientmodel.Metric, error)
// Get the two metric values that are immediately adjacent to a given time.
GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values
// Get the boundary values of an interval: the first value older than the
// interval start, and the first value younger than the interval end.
GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values
// Get all values contained within a provided interval.
GetRangeValues(*clientmodel.Fingerprint, Interval) Values
// Get all label values that are associated with a given label name. // Get all label values that are associated with a given label name.
GetAllValuesForLabel(clientmodel.LabelName) (clientmodel.LabelValues, error) GetAllValuesForLabel(clientmodel.LabelName) (clientmodel.LabelValues, error)
} }
@ -57,19 +46,19 @@ type MetricPersistence interface {
// View provides a view of the values in the datastore subject to the request // View provides a view of the values in the datastore subject to the request
// of a preloading operation. // of a preloading operation.
type View interface { type View interface {
// Get the two values that are immediately adjacent to a given time.
GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values
// Get the boundary values of an interval: the first value older than
// the interval start, and the first value younger than the interval
// end.
GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values
// Get all values contained within a provided interval.
GetRangeValues(*clientmodel.Fingerprint, Interval) Values GetRangeValues(*clientmodel.Fingerprint, Interval) Values
// Destroy this view.
Close()
} }
type Series interface { // ViewableMetricPersistence is a MetricPersistence that is able to present the
Fingerprint() *clientmodel.Fingerprint // samples it has stored as a View.
Metric() clientmodel.Metric type ViewableMetricPersistence interface {
} MetricPersistence
View
type IteratorsForFingerprintBuilder interface {
ForStream(stream *stream) (storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator)
} }

View file

@ -17,11 +17,14 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
) )
// LabelPair pairs a name with a value.
type LabelPair struct { type LabelPair struct {
Name clientmodel.LabelName Name clientmodel.LabelName
Value clientmodel.LabelValue Value clientmodel.LabelValue
} }
// Equal returns true iff both the Name and the Value of this LabelPair and o
// are equal.
func (l *LabelPair) Equal(o *LabelPair) bool { func (l *LabelPair) Equal(o *LabelPair) bool {
switch { switch {
case l.Name != o.Name: case l.Name != o.Name:
@ -33,6 +36,8 @@ func (l *LabelPair) Equal(o *LabelPair) bool {
} }
} }
// LabelPairs is a sortable slice of LabelPair pointers. It implements
// sort.Interface.
type LabelPairs []*LabelPair type LabelPairs []*LabelPair
func (l LabelPairs) Len() int { func (l LabelPairs) Len() int {

View file

@ -35,11 +35,12 @@ import (
const sortConcurrency = 2 const sortConcurrency = 2
// LevelDBMetricPersistence is a leveldb-backed persistence layer for metrics.
type LevelDBMetricPersistence struct { type LevelDBMetricPersistence struct {
CurationRemarks CurationRemarker CurationRemarks CurationRemarker
FingerprintToMetrics FingerprintMetricIndex FingerprintToMetrics FingerprintMetricIndex
LabelNameToFingerprints LabelNameFingerprintIndex LabelNameToFingerprints LabelNameFingerprintIndex
LabelSetToFingerprints LabelPairFingerprintIndex LabelPairToFingerprints LabelPairFingerprintIndex
MetricHighWatermarks HighWatermarker MetricHighWatermarks HighWatermarker
MetricMembershipIndex MetricMembershipIndex MetricMembershipIndex MetricMembershipIndex
@ -63,8 +64,8 @@ type LevelDBMetricPersistence struct {
var ( var (
leveldbChunkSize = flag.Int("leveldbChunkSize", 200, "Maximum number of samples stored under one key.") leveldbChunkSize = flag.Int("leveldbChunkSize", 200, "Maximum number of samples stored under one key.")
// These flag values are back of the envelope, though they seem sensible. // These flag values are back of the envelope, though they seem
// Please re-evaluate based on your own needs. // sensible. Please re-evaluate based on your own needs.
curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 5*1024*1024, "The size for the curation remarks cache (bytes).") curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 5*1024*1024, "The size for the curation remarks cache (bytes).")
fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 25*1024*1024, "The size for the fingerprint to label pair index (bytes).") fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 25*1024*1024, "The size for the fingerprint to label pair index (bytes).")
highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 5*1024*1024, "The size for the metric high watermarks (bytes).") highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 5*1024*1024, "The size for the metric high watermarks (bytes).")
@ -75,19 +76,15 @@ var (
) )
type leveldbOpener func() type leveldbOpener func()
type errorCloser interface {
Close() error
}
type closer interface {
Close()
}
// Close closes all the underlying persistence layers. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) Close() { func (l *LevelDBMetricPersistence) Close() {
var persistences = []interface{}{ var persistences = []raw.Database{
l.CurationRemarks, l.CurationRemarks,
l.FingerprintToMetrics, l.FingerprintToMetrics,
l.LabelNameToFingerprints, l.LabelNameToFingerprints,
l.LabelSetToFingerprints, l.LabelPairToFingerprints,
l.MetricHighWatermarks, l.MetricHighWatermarks,
l.MetricMembershipIndex, l.MetricMembershipIndex,
l.MetricSamples, l.MetricSamples,
@ -97,17 +94,12 @@ func (l *LevelDBMetricPersistence) Close() {
for _, c := range persistences { for _, c := range persistences {
closerGroup.Add(1) closerGroup.Add(1)
go func(c interface{}) { go func(c raw.Database) {
if c != nil { if c != nil {
switch closer := c.(type) { if err := c.Close(); err != nil {
case closer:
closer.Close()
case errorCloser:
if err := closer.Close(); err != nil {
glog.Error("Error closing persistence: ", err) glog.Error("Error closing persistence: ", err)
} }
} }
}
closerGroup.Done() closerGroup.Done()
}(c) }(c)
} }
@ -115,10 +107,12 @@ func (l *LevelDBMetricPersistence) Close() {
closerGroup.Wait() closerGroup.Wait()
} }
// NewLevelDBMetricPersistence returns a LevelDBMetricPersistence object ready
// to use.
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
workers := utility.NewUncertaintyGroup(7) workers := utility.NewUncertaintyGroup(7)
emission := new(LevelDBMetricPersistence) emission := &LevelDBMetricPersistence{}
var subsystemOpeners = []struct { var subsystemOpeners = []struct {
name string name string
@ -185,7 +179,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair", "Fingerprints by Label Name and Value Pair",
func() { func() {
var err error var err error
emission.LabelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{ emission.LabelPairToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{ LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Pair", Name: "Fingerprints by Label Pair",
Purpose: "Index", Purpose: "Index",
@ -239,19 +233,20 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
glog.Error("Could not open storage: ", err) glog.Error("Could not open storage: ", err)
} }
return nil, fmt.Errorf("Unable to open metric persistence.") return nil, fmt.Errorf("unable to open metric persistence")
} }
emission.Indexer = &TotalIndexer{ emission.Indexer = &TotalIndexer{
FingerprintToMetric: emission.FingerprintToMetrics, FingerprintToMetric: emission.FingerprintToMetrics,
LabelNameToFingerprint: emission.LabelNameToFingerprints, LabelNameToFingerprint: emission.LabelNameToFingerprints,
LabelPairToFingerprint: emission.LabelSetToFingerprints, LabelPairToFingerprint: emission.LabelPairToFingerprints,
MetricMembership: emission.MetricMembershipIndex, MetricMembership: emission.MetricMembershipIndex,
} }
return emission, nil return emission, nil
} }
// AppendSample implements the MetricPersistence interface.
func (l *LevelDBMetricPersistence) AppendSample(sample *clientmodel.Sample) (err error) { func (l *LevelDBMetricPersistence) AppendSample(sample *clientmodel.Sample) (err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
duration := time.Since(begin) duration := time.Since(begin)
@ -317,6 +312,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.
return l.MetricHighWatermarks.UpdateBatch(b) return l.MetricHighWatermarks.UpdateBatch(b)
} }
// AppendSamples appends the given Samples to the database and indexes them.
func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) { func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
duration := time.Since(begin) duration := time.Since(begin)
@ -345,9 +341,9 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
samplesBatch := leveldb.NewBatch() samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close() defer samplesBatch.Close()
key := new(SampleKey) key := &SampleKey{}
keyDto := new(dto.SampleKey) keyDto := &dto.SampleKey{}
value := new(dto.SampleValueSeries) value := &dto.SampleValueSeries{}
for fingerprint, group := range fingerprintToSamples { for fingerprint, group := range fingerprintToSamples {
for { for {
@ -434,6 +430,8 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value b
return l.MetricMembershipIndex.Has(m) return l.MetricMembershipIndex.Has(m)
} }
// HasLabelPair returns true if the given LabelPair is present in the underlying
// LabelPair index.
func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) { func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
duration := time.Since(begin) duration := time.Since(begin)
@ -441,9 +439,11 @@ func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err e
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now()) }(time.Now())
return l.LabelSetToFingerprints.Has(p) return l.LabelPairToFingerprints.Has(p)
} }
// HasLabelName returns true if the given LabelName is present in the underlying
// LabelName index.
func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value bool, err error) { func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value bool, err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
duration := time.Since(begin) duration := time.Since(begin)
@ -456,6 +456,9 @@ func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value
return return
} }
// GetFingerprintsForLabelSet returns the Fingerprints for the given LabelSet by
// querying the underlying LabelPairFingerprintIndex for each LabelPair
// contained in LabelSet. It implements the MetricPersistence interface.
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (fps clientmodel.Fingerprints, err error) { func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (fps clientmodel.Fingerprints, err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
duration := time.Since(begin) duration := time.Since(begin)
@ -466,7 +469,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod
sets := []utility.Set{} sets := []utility.Set{}
for name, value := range labelSet { for name, value := range labelSet {
fps, _, err := l.LabelSetToFingerprints.Lookup(&LabelPair{ fps, _, err := l.LabelPairToFingerprints.Lookup(&LabelPair{
Name: name, Name: name,
Value: value, Value: value,
}) })
@ -500,6 +503,9 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod
return fps, nil return fps, nil
} }
// GetFingerprintsForLabelName returns the Fingerprints for the given LabelName
// from the underlying LabelNameFingerprintIndex. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientmodel.LabelName) (fps clientmodel.Fingerprints, err error) { func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientmodel.LabelName) (fps clientmodel.Fingerprints, err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
duration := time.Since(begin) duration := time.Since(begin)
@ -513,6 +519,9 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientm
return fps, err return fps, err
} }
// GetMetricForFingerprint returns the Metric for the given Fingerprint from the
// underlying FingerprintMetricIndex. It implements the MetricPersistence
// interface.
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) { func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
defer func(begin time.Time) { defer func(begin time.Time) {
duration := time.Since(begin) duration := time.Since(begin)
@ -526,68 +535,15 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger
return m, nil return m, nil
} }
func (l *LevelDBMetricPersistence) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) Values { // GetAllValuesForLabel gets all label values that are associated with the
panic("Not implemented") // provided label name.
}
func (l *LevelDBMetricPersistence) GetBoundaryValues(f *clientmodel.Fingerprint, i Interval) Values {
panic("Not implemented")
}
func (l *LevelDBMetricPersistence) GetRangeValues(f *clientmodel.Fingerprint, i Interval) Values {
panic("Not implemented")
}
type MetricKeyDecoder struct{}
func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
unmarshaled := dto.LabelPair{}
err = proto.Unmarshal(in.([]byte), &unmarshaled)
if err != nil {
return
}
out = LabelPair{
Name: clientmodel.LabelName(*unmarshaled.Name),
Value: clientmodel.LabelValue(*unmarshaled.Value),
}
return
}
func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
return
}
type LabelNameFilter struct {
labelName clientmodel.LabelName
}
func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
labelPair, ok := key.(LabelPair)
if ok && labelPair.Name == f.labelName {
return storage.ACCEPT
}
return storage.SKIP
}
type CollectLabelValuesOp struct {
labelValues []clientmodel.LabelValue
}
func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
labelPair := key.(LabelPair)
op.labelValues = append(op.labelValues, clientmodel.LabelValue(labelPair.Value))
return
}
func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) { func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
filter := &LabelNameFilter{ filter := &LabelNameFilter{
labelName: labelName, labelName: labelName,
} }
labelValuesOp := &CollectLabelValuesOp{} labelValuesOp := &CollectLabelValuesOp{}
_, err = l.LabelSetToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp) _, err = l.LabelPairToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp)
if err != nil { if err != nil {
return return
} }
@ -604,41 +560,42 @@ func (l *LevelDBMetricPersistence) Prune() {
l.CurationRemarks.Prune() l.CurationRemarks.Prune()
l.FingerprintToMetrics.Prune() l.FingerprintToMetrics.Prune()
l.LabelNameToFingerprints.Prune() l.LabelNameToFingerprints.Prune()
l.LabelSetToFingerprints.Prune() l.LabelPairToFingerprints.Prune()
l.MetricHighWatermarks.Prune() l.MetricHighWatermarks.Prune()
l.MetricMembershipIndex.Prune() l.MetricMembershipIndex.Prune()
l.MetricSamples.Prune() l.MetricSamples.Prune()
} }
// Sizes returns the sum of all sizes of the underlying databases.
func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) { func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
size := uint64(0) size := uint64(0)
if size, _, err = l.CurationRemarks.Size(); err != nil { if size, err = l.CurationRemarks.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size
if size, _, err = l.FingerprintToMetrics.Size(); err != nil { if size, err = l.FingerprintToMetrics.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size
if size, _, err = l.LabelNameToFingerprints.Size(); err != nil { if size, err = l.LabelNameToFingerprints.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size
if size, _, err = l.LabelSetToFingerprints.Size(); err != nil { if size, err = l.LabelPairToFingerprints.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size
if size, _, err = l.MetricHighWatermarks.Size(); err != nil { if size, err = l.MetricHighWatermarks.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size
if size, _, err = l.MetricMembershipIndex.Size(); err != nil { if size, err = l.MetricMembershipIndex.Size(); err != nil {
return 0, err return 0, err
} }
total += size total += size
@ -651,20 +608,64 @@ func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
return total, nil return total, nil
} }
// States returns the DatabaseStates of all underlying databases.
func (l *LevelDBMetricPersistence) States() raw.DatabaseStates { func (l *LevelDBMetricPersistence) States() raw.DatabaseStates {
return raw.DatabaseStates{ return raw.DatabaseStates{
l.CurationRemarks.State(), l.CurationRemarks.State(),
l.FingerprintToMetrics.State(), l.FingerprintToMetrics.State(),
l.LabelNameToFingerprints.State(), l.LabelNameToFingerprints.State(),
l.LabelSetToFingerprints.State(), l.LabelPairToFingerprints.State(),
l.MetricHighWatermarks.State(), l.MetricHighWatermarks.State(),
l.MetricMembershipIndex.State(), l.MetricMembershipIndex.State(),
l.MetricSamples.State(), l.MetricSamples.State(),
} }
} }
// CollectLabelValuesOp implements storage.RecordOperator. It collects the
// encountered LabelValues in a slice.
type CollectLabelValuesOp struct {
labelValues []clientmodel.LabelValue
}
// Operate implements storage.RecordOperator. 'key' is required to be a
// LabelPair. Its Value is appended to a slice of collected LabelValues.
func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
labelPair := key.(LabelPair)
op.labelValues = append(op.labelValues, labelPair.Value)
return
}
// MetricKeyDecoder implements storage.RecordDecoder for LabelPairs.
type MetricKeyDecoder struct{}
// DecodeKey implements storage.RecordDecoder. It requires 'in' to be a
// LabelPair protobuf. 'out' is a metric.LabelPair.
func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
unmarshaled := dto.LabelPair{}
err = proto.Unmarshal(in.([]byte), &unmarshaled)
if err != nil {
return
}
out = LabelPair{
Name: clientmodel.LabelName(*unmarshaled.Name),
Value: clientmodel.LabelValue(*unmarshaled.Value),
}
return
}
// DecodeValue implements storage.RecordDecoder. It is a no-op and always
// returns (nil, nil).
func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
return
}
// MetricSamplesDecoder implements storage.RecordDecoder for SampleKeys.
type MetricSamplesDecoder struct{} type MetricSamplesDecoder struct{}
// DecodeKey implements storage.RecordDecoder. It requires 'in' to be a
// SampleKey protobuf. 'out' is a metric.SampleKey.
func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) { func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) {
key := &dto.SampleKey{} key := &dto.SampleKey{}
err := proto.Unmarshal(in.([]byte), key) err := proto.Unmarshal(in.([]byte), key)
@ -678,6 +679,8 @@ func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) {
return sampleKey, nil return sampleKey, nil
} }
// DecodeValue implements storage.RecordDecoder. It requires 'in' to be a
// SampleValueSeries protobuf. 'out' is of type metric.Values.
func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) { func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) {
values := &dto.SampleValueSeries{} values := &dto.SampleValueSeries{}
err := proto.Unmarshal(in.([]byte), values) err := proto.Unmarshal(in.([]byte), values)
@ -688,8 +691,27 @@ func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error)
return NewValuesFromDTO(values), nil return NewValuesFromDTO(values), nil
} }
// AcceptAllFilter implements storage.RecordFilter and accepts all records.
type AcceptAllFilter struct{} type AcceptAllFilter struct{}
// Filter implements storage.RecordFilter. It always returns ACCEPT.
func (d *AcceptAllFilter) Filter(_, _ interface{}) storage.FilterResult { func (d *AcceptAllFilter) Filter(_, _ interface{}) storage.FilterResult {
return storage.ACCEPT return storage.Accept
}
// LabelNameFilter implements storage.RecordFilter and filters records matching
// a LabelName.
type LabelNameFilter struct {
labelName clientmodel.LabelName
}
// Filter implements storage.RecordFilter. 'key' is expected to be a
// LabelPair. The result is ACCEPT if the Name of the LabelPair matches the
// LabelName of this LabelNameFilter.
func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
labelPair, ok := key.(LabelPair)
if ok && labelPair.Name == f.labelName {
return storage.Accept
}
return storage.Skip
} }

View file

@ -22,24 +22,10 @@ import (
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of // An initialSeriesArenaSize of 4*60 allows for one hour's worth of storage per
// storage per metric without any major reallocations. // metric without any major reallocations - assuming a sample rate of 1 / 15Hz.
const initialSeriesArenaSize = 4 * 60 const initialSeriesArenaSize = 4 * 60
// Models a given sample entry stored in the in-memory arena.
type value interface {
// Gets the given value.
get() clientmodel.SampleValue
}
// Models a single sample value. It presumes that there is either no subsequent
// value seen or that any subsequent values are of a different value.
type singletonValue clientmodel.SampleValue
func (v singletonValue) get() clientmodel.SampleValue {
return clientmodel.SampleValue(v)
}
type stream interface { type stream interface {
add(...*SamplePair) add(...*SamplePair)
@ -194,9 +180,11 @@ type memorySeriesStorage struct {
labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints
} }
// MemorySeriesOptions bundles options used by NewMemorySeriesStorage to create
// a memory series storage.
type MemorySeriesOptions struct { type MemorySeriesOptions struct {
// If provided, this WatermarkCache will be updated for any samples that are // If provided, this WatermarkCache will be updated for any samples that
// appended to the memorySeriesStorage. // are appended to the memorySeriesStorage.
WatermarkCache *watermarkCache WatermarkCache *watermarkCache
} }
@ -485,6 +473,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa
return return
} }
// NewMemorySeriesStorage returns a memory series storage ready to use.
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage { func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
return &memorySeriesStorage{ return &memorySeriesStorage{
fingerprintToSeries: make(map[clientmodel.Fingerprint]stream), fingerprintToSeries: make(map[clientmodel.Fingerprint]stream),

View file

@ -22,7 +22,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
) )
// Encapsulates a primitive query operation. // op encapsulates a primitive query operation.
type op interface { type op interface {
// The time at which this operation starts. // The time at which this operation starts.
StartsAt() clientmodel.Timestamp StartsAt() clientmodel.Timestamp
@ -30,14 +30,14 @@ type op interface {
ExtractSamples(Values) Values ExtractSamples(Values) Values
// Return whether the operator has consumed all data it needs. // Return whether the operator has consumed all data it needs.
Consumed() bool Consumed() bool
// Get current operation time or nil if no subsequent work associated with // Get current operation time or nil if no subsequent work associated
// this operator remains. // with this operator remains.
CurrentTime() clientmodel.Timestamp CurrentTime() clientmodel.Timestamp
// GreedierThan indicates whether this present operation should take // GreedierThan indicates whether this present operation should take
// precedence over the other operation due to greediness. // precedence over the other operation due to greediness.
// //
// A critical assumption is that this operator and the other occur at the // A critical assumption is that this operator and the other occur at
// same time: this.StartsAt().Equal(op.StartsAt()). // the same time: this.StartsAt().Equal(op.StartsAt()).
GreedierThan(op) bool GreedierThan(op) bool
} }
@ -48,8 +48,8 @@ func (o ops) Len() int {
return len(o) return len(o)
} }
// startsAtSort implements the sorting protocol and allows operator to be sorted // startsAtSort implements sort.Interface and allows operator to be sorted in
// in chronological order by when they start. // chronological order by when they start.
type startsAtSort struct { type startsAtSort struct {
ops ops
} }
@ -62,7 +62,8 @@ func (o ops) Swap(i, j int) {
o[i], o[j] = o[j], o[i] o[i], o[j] = o[j], o[i]
} }
// Encapsulates getting values at or adjacent to a specific time. // getValuesAtTimeOp encapsulates getting values at or adjacent to a specific
// time.
type getValuesAtTimeOp struct { type getValuesAtTimeOp struct {
time clientmodel.Timestamp time clientmodel.Timestamp
consumed bool consumed bool
@ -112,15 +113,17 @@ func extractValuesAroundTime(t clientmodel.Timestamp, in Values) (out Values) {
out = in[len(in)-1:] out = in[len(in)-1:]
} else { } else {
if in[i].Timestamp.Equal(t) && len(in) > i+1 { if in[i].Timestamp.Equal(t) && len(in) > i+1 {
// We hit exactly the current sample time. Very unlikely in practice. // We hit exactly the current sample time. Very unlikely
// Return only the current sample. // in practice. Return only the current sample.
out = in[i : i+1] out = in[i : i+1]
} else { } else {
if i == 0 { if i == 0 {
// We hit before the first sample time. Return only the first sample. // We hit before the first sample time. Return
// only the first sample.
out = in[0:1] out = in[0:1]
} else { } else {
// We hit between two samples. Return both surrounding samples. // We hit between two samples. Return both
// surrounding samples.
out = in[i-1 : i+1] out = in[i-1 : i+1]
} }
} }
@ -136,15 +139,16 @@ func (g getValuesAtTimeOp) Consumed() bool {
return g.consumed return g.consumed
} }
// Encapsulates getting values at a given interval over a duration. // getValuesAtIntervalOp encapsulates getting values at a given interval over a
// duration.
type getValuesAtIntervalOp struct { type getValuesAtIntervalOp struct {
from clientmodel.Timestamp from clientmodel.Timestamp
through clientmodel.Timestamp through clientmodel.Timestamp
interval time.Duration interval time.Duration
} }
func (o *getValuesAtIntervalOp) String() string { func (g *getValuesAtIntervalOp) String() string {
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", o.from, o.interval, o.through) return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.from, g.interval, g.through)
} }
func (g *getValuesAtIntervalOp) StartsAt() clientmodel.Timestamp { func (g *getValuesAtIntervalOp) StartsAt() clientmodel.Timestamp {
@ -199,14 +203,14 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
return return
} }
// Encapsulates getting all values in a given range. // getValuesAlongRangeOp encapsulates getting all values in a given range.
type getValuesAlongRangeOp struct { type getValuesAlongRangeOp struct {
from clientmodel.Timestamp from clientmodel.Timestamp
through clientmodel.Timestamp through clientmodel.Timestamp
} }
func (o *getValuesAlongRangeOp) String() string { func (g *getValuesAlongRangeOp) String() string {
return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", o.from, o.through) return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", g.from, g.through)
} }
func (g *getValuesAlongRangeOp) StartsAt() clientmodel.Timestamp { func (g *getValuesAlongRangeOp) StartsAt() clientmodel.Timestamp {
@ -271,7 +275,8 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
return return
} }
// Encapsulates getting all values from ranges along intervals. // getValueRangeAtIntervalOp encapsulates getting all values from ranges along
// intervals.
// //
// Works just like getValuesAlongRangeOp, but when from > through, through is // Works just like getValuesAlongRangeOp, but when from > through, through is
// incremented by interval and from is reset to through-rangeDuration. Returns // incremented by interval and from is reset to through-rangeDuration. Returns
@ -284,8 +289,8 @@ type getValueRangeAtIntervalOp struct {
through clientmodel.Timestamp through clientmodel.Timestamp
} }
func (o *getValueRangeAtIntervalOp) String() string { func (g *getValueRangeAtIntervalOp) String() string {
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through) return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.rangeFrom, g.interval, g.through)
} }
func (g *getValueRangeAtIntervalOp) StartsAt() clientmodel.Timestamp { func (g *getValueRangeAtIntervalOp) StartsAt() clientmodel.Timestamp {

View file

@ -26,43 +26,51 @@ import (
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
) )
// processor models a post-processing agent that performs work given a sample // Processor models a post-processing agent that performs work given a sample
// corpus. // corpus.
type Processor interface { type Processor interface {
// Name emits the name of this processor's signature encoder. It must be // Name emits the name of this processor's signature encoder. It must
// fully-qualified in the sense that it could be used via a Protocol Buffer // be fully-qualified in the sense that it could be used via a Protocol
// registry to extract the descriptor to reassemble this message. // Buffer registry to extract the descriptor to reassemble this message.
Name() string Name() string
// Signature emits a byte signature for this process for the purpose of // Signature emits a byte signature for this process for the purpose of
// remarking how far along it has been applied to the database. // remarking how far along it has been applied to the database.
Signature() []byte Signature() []byte
// Apply runs this processor against the sample set. sampleIterator expects // Apply runs this processor against the sample set. sampleIterator
// to be pre-seeked to the initial starting position. The processor will // expects to be pre-seeked to the initial starting position. The
// run until up until stopAt has been reached. It is imperative that the // processor will run until up until stopAt has been reached. It is
// provided stopAt is within the interval of the series frontier. // imperative that the provided stopAt is within the interval of the
// series frontier.
// //
// Upon completion or error, the last time at which the processor finished // Upon completion or error, the last time at which the processor
// shall be emitted in addition to any errors. // finished shall be emitted in addition to any errors.
Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error)
// Close reaps all of the underlying system resources associated with
// this processor.
Close()
} }
// CompactionProcessor combines sparse values in the database together such // CompactionProcessor combines sparse values in the database together such that
// that at least MinimumGroupSize-sized chunks are grouped together. // at least MinimumGroupSize-sized chunks are grouped together. It implements
// the Processor interface.
type CompactionProcessor struct { type CompactionProcessor struct {
maximumMutationPoolBatch int maximumMutationPoolBatch int
minimumGroupSize int minimumGroupSize int
// signature is the byte representation of the CompactionProcessor's settings, // signature is the byte representation of the CompactionProcessor's
// used for purely memoization purposes across an instance. // settings, used for purely memoization purposes across an instance.
signature []byte signature []byte
dtoSampleKeys *dtoSampleKeyList dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList sampleKeys *sampleKeyList
} }
// Name implements the Processor interface. It returns
// "io.prometheus.CompactionProcessorDefinition".
func (p *CompactionProcessor) Name() string { func (p *CompactionProcessor) Name() string {
return "io.prometheus.CompactionProcessorDefinition" return "io.prometheus.CompactionProcessorDefinition"
} }
// Signature implements the Processor interface.
func (p *CompactionProcessor) Signature() []byte { func (p *CompactionProcessor) Signature() []byte {
if len(p.signature) == 0 { if len(p.signature) == 0 {
out, err := proto.Marshal(&dto.CompactionProcessorDefinition{ out, err := proto.Marshal(&dto.CompactionProcessorDefinition{
@ -82,8 +90,9 @@ func (p *CompactionProcessor) String() string {
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize) return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize)
} }
// Apply implements the Processor interface.
func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) { func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) {
var pendingBatch raw.Batch = nil var pendingBatch raw.Batch
defer func() { defer func() {
if pendingBatch != nil { if pendingBatch != nil {
@ -125,7 +134,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
// block would prevent us from going into unsafe territory. // block would prevent us from going into unsafe territory.
case len(unactedSamples) == 0: case len(unactedSamples) == 0:
if !sampleIterator.Next() { if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") return lastCurated, fmt.Errorf("illegal condition: invalid iterator on continuation")
} }
keyDropped = false keyDropped = false
@ -163,7 +172,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize: case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize:
if !keyDropped { if !keyDropped {
k := new(dto.SampleKey) k := &dto.SampleKey{}
sampleKey.Dump(k) sampleKey.Dump(k)
pendingBatch.Drop(k) pendingBatch.Drop(k)
@ -176,10 +185,10 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
// If the number of pending writes equals the target group size // If the number of pending writes equals the target group size
case len(pendingSamples) == p.minimumGroupSize: case len(pendingSamples) == p.minimumGroupSize:
k := new(dto.SampleKey) k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey.Dump(k) newSampleKey.Dump(k)
b := new(dto.SampleValueSeries) b := &dto.SampleValueSeries{}
pendingSamples.dump(b) pendingSamples.dump(b)
pendingBatch.Put(k, b) pendingBatch.Put(k, b)
@ -205,7 +214,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
case len(pendingSamples)+len(unactedSamples) >= p.minimumGroupSize: case len(pendingSamples)+len(unactedSamples) >= p.minimumGroupSize:
if !keyDropped { if !keyDropped {
k := new(dto.SampleKey) k := &dto.SampleKey{}
sampleKey.Dump(k) sampleKey.Dump(k)
pendingBatch.Drop(k) pendingBatch.Drop(k)
keyDropped = true keyDropped = true
@ -220,16 +229,16 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
} }
pendingMutations++ pendingMutations++
default: default:
err = fmt.Errorf("Unhandled processing case.") err = fmt.Errorf("unhandled processing case")
} }
} }
if len(unactedSamples) > 0 || len(pendingSamples) > 0 { if len(unactedSamples) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(pendingSamples, unactedSamples...) pendingSamples = append(pendingSamples, unactedSamples...)
k := new(dto.SampleKey) k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey.Dump(k) newSampleKey.Dump(k)
b := new(dto.SampleValueSeries) b := &dto.SampleValueSeries{}
pendingSamples.dump(b) pendingSamples.dump(b)
pendingBatch.Put(k, b) pendingBatch.Put(k, b)
pendingSamples = Values{} pendingSamples = Values{}
@ -249,11 +258,14 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
return return
} }
// Close implements the Processor interface.
func (p *CompactionProcessor) Close() { func (p *CompactionProcessor) Close() {
p.dtoSampleKeys.Close() p.dtoSampleKeys.Close()
p.sampleKeys.Close() p.sampleKeys.Close()
} }
// CompactionProcessorOptions are used for connstruction of a
// CompactionProcessor.
type CompactionProcessorOptions struct { type CompactionProcessorOptions struct {
// MaximumMutationPoolBatch represents approximately the largest pending // MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to // batch of mutation operations for the database before pausing to
@ -266,6 +278,7 @@ type CompactionProcessorOptions struct {
MinimumGroupSize int MinimumGroupSize int
} }
// NewCompactionProcessor returns a CompactionProcessor ready to use.
func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor { func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor {
return &CompactionProcessor{ return &CompactionProcessor{
maximumMutationPoolBatch: o.MaximumMutationPoolBatch, maximumMutationPoolBatch: o.MaximumMutationPoolBatch,
@ -276,7 +289,8 @@ func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor
} }
} }
// DeletionProcessor deletes sample blocks older than a defined value. // DeletionProcessor deletes sample blocks older than a defined value. It
// implements the Processor interface.
type DeletionProcessor struct { type DeletionProcessor struct {
maximumMutationPoolBatch int maximumMutationPoolBatch int
// signature is the byte representation of the DeletionProcessor's settings, // signature is the byte representation of the DeletionProcessor's settings,
@ -287,10 +301,13 @@ type DeletionProcessor struct {
sampleKeys *sampleKeyList sampleKeys *sampleKeyList
} }
// Name implements the Processor interface. It returns
// "io.prometheus.DeletionProcessorDefinition".
func (p *DeletionProcessor) Name() string { func (p *DeletionProcessor) Name() string {
return "io.prometheus.DeletionProcessorDefinition" return "io.prometheus.DeletionProcessorDefinition"
} }
// Signature implements the Processor interface.
func (p *DeletionProcessor) Signature() []byte { func (p *DeletionProcessor) Signature() []byte {
if len(p.signature) == 0 { if len(p.signature) == 0 {
out, err := proto.Marshal(&dto.DeletionProcessorDefinition{}) out, err := proto.Marshal(&dto.DeletionProcessorDefinition{})
@ -309,8 +326,9 @@ func (p *DeletionProcessor) String() string {
return "deletionProcessor" return "deletionProcessor"
} }
// Apply implements the Processor interface.
func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) { func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) {
var pendingBatch raw.Batch = nil var pendingBatch raw.Batch
defer func() { defer func() {
if pendingBatch != nil { if pendingBatch != nil {
@ -342,12 +360,13 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
case pendingBatch == nil: case pendingBatch == nil:
pendingBatch = leveldb.NewBatch() pendingBatch = leveldb.NewBatch()
// If there are no sample values to extract from the datastore, let's // If there are no sample values to extract from the datastore,
// continue extracting more values to use. We know that the time.Before() // let's continue extracting more values to use. We know that
// block would prevent us from going into unsafe territory. // the time.Before() block would prevent us from going into
// unsafe territory.
case len(sampleValues) == 0: case len(sampleValues) == 0:
if !sampleIterator.Next() { if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") return lastCurated, fmt.Errorf("illegal condition: invalid iterator on continuation")
} }
if err = sampleIterator.Key(sampleKeyDto); err != nil { if err = sampleIterator.Key(sampleKeyDto); err != nil {
@ -360,9 +379,9 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
return return
} }
// If the number of pending mutations exceeds the allowed batch amount, // If the number of pending mutations exceeds the allowed batch
// commit to disk and delete the batch. A new one will be recreated if // amount, commit to disk and delete the batch. A new one will
// necessary. // be recreated if necessary.
case pendingMutations >= p.maximumMutationPoolBatch: case pendingMutations >= p.maximumMutationPoolBatch:
err = samplesPersistence.Commit(pendingBatch) err = samplesPersistence.Commit(pendingBatch)
if err != nil { if err != nil {
@ -403,7 +422,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
} }
default: default:
err = fmt.Errorf("Unhandled processing case.") err = fmt.Errorf("unhandled processing case")
} }
} }
@ -419,11 +438,13 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
return return
} }
// Close implements the Processor interface.
func (p *DeletionProcessor) Close() { func (p *DeletionProcessor) Close() {
p.dtoSampleKeys.Close() p.dtoSampleKeys.Close()
p.sampleKeys.Close() p.sampleKeys.Close()
} }
// DeletionProcessorOptions are used for connstruction of a DeletionProcessor.
type DeletionProcessorOptions struct { type DeletionProcessorOptions struct {
// MaximumMutationPoolBatch represents approximately the largest pending // MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to // batch of mutation operations for the database before pausing to
@ -431,6 +452,7 @@ type DeletionProcessorOptions struct {
MaximumMutationPoolBatch int MaximumMutationPoolBatch int
} }
// NewDeletionProcessor returns a DeletionProcessor ready to use.
func NewDeletionProcessor(o *DeletionProcessorOptions) *DeletionProcessor { func NewDeletionProcessor(o *DeletionProcessorOptions) *DeletionProcessor {
return &DeletionProcessor{ return &DeletionProcessor{
maximumMutationPoolBatch: o.MaximumMutationPoolBatch, maximumMutationPoolBatch: o.MaximumMutationPoolBatch,

View file

@ -112,7 +112,7 @@ func (s sampleGroup) Get() (key, value proto.Message) {
return k, v return k, v
} }
type noopUpdater bool type noopUpdater struct{}
func (noopUpdater) UpdateCurationState(*CurationState) {} func (noopUpdater) UpdateCurationState(*CurationState) {}
@ -876,7 +876,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
} }
defer samples.Close() defer samples.Close()
updates := new(noopUpdater) updates := &noopUpdater{}
stop := make(chan bool) stop := make(chan bool)
defer close(stop) defer close(stop)
@ -891,7 +891,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
iterator, err := curatorStates.p.NewIterator(true) iterator, err := curatorStates.LevelDBPersistence.NewIterator(true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -909,12 +909,12 @@ func TestCuratorCompactionProcessor(t *testing.T) {
} }
} }
curationKeyDto := new(dto.CurationKey) curationKeyDto := &dto.CurationKey{}
err = iterator.Key(curationKeyDto) err = iterator.Key(curationKeyDto)
if err != nil { if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
} }
actualKey := new(curationKey) actualKey := &curationKey{}
actualKey.load(curationKeyDto) actualKey.load(curationKeyDto)
actualValue, present, err := curatorStates.Get(actualKey) actualValue, present, err := curatorStates.Get(actualKey)
@ -988,7 +988,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
for k, actualValue := range sampleValues { for k, actualValue := range sampleValues {
if expected.values[k].Value != actualValue.Value { if expected.values[k].Value != actualValue.Value {
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value) t.Fatalf("%d.%d.%d. expected %v, got %v", i, j, k, expected.values[k].Value, actualValue.Value)
} }
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) { if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp) t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)
@ -1405,7 +1405,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
} }
defer samples.Close() defer samples.Close()
updates := new(noopUpdater) updates := &noopUpdater{}
stop := make(chan bool) stop := make(chan bool)
defer close(stop) defer close(stop)
@ -1420,7 +1420,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
iterator, err := curatorStates.p.NewIterator(true) iterator, err := curatorStates.LevelDBPersistence.NewIterator(true)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1438,12 +1438,12 @@ func TestCuratorDeletionProcessor(t *testing.T) {
} }
} }
curationKeyDto := new(dto.CurationKey) curationKeyDto := &dto.CurationKey{}
if err := iterator.Key(curationKeyDto); err != nil { if err := iterator.Key(curationKeyDto); err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
} }
actualKey := new(curationKey) actualKey := &curationKey{}
actualKey.load(curationKeyDto) actualKey.load(curationKeyDto)
signature := expected.processor.Signature() signature := expected.processor.Signature()
@ -1518,7 +1518,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
for k, actualValue := range sampleValues { for k, actualValue := range sampleValues {
if expected.values[k].Value != actualValue.Value { if expected.values[k].Value != actualValue.Value {
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value) t.Fatalf("%d.%d.%d. expected %v, got %v", i, j, k, expected.values[k].Value, actualValue.Value)
} }
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) { if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp) t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)

View file

@ -22,7 +22,7 @@ import (
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
) )
func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) { func GetValueAtTimeTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), t test.Tester) {
type value struct { type value struct {
year int year int
month time.Month month time.Month
@ -355,7 +355,7 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer
} }
} }
func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer), onlyBoundaries bool, t test.Tester) { func GetRangeValuesTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), onlyBoundaries bool, t test.Tester) {
type value struct { type value struct {
year int year int
month time.Month month time.Month
@ -854,7 +854,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
} }
if actualValues == nil && len(expectedValues) != 0 { if actualValues == nil && len(expectedValues) != 0 {
t.Fatalf("%d.%d(%s). Expected %s but got: %s\n", i, j, behavior.name, expectedValues, actualValues) t.Fatalf("%d.%d(%s). Expected %v but got: %v\n", i, j, behavior.name, expectedValues, actualValues)
} }
if expectedValues == nil { if expectedValues == nil {
@ -899,7 +899,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
// Test Definitions Follow // Test Definitions Follow
func testMemoryGetValueAtTime(t test.Tester) { func testMemoryGetValueAtTime(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) { persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
} }
@ -927,7 +927,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) {
} }
func testMemoryGetRangeValues(t test.Tester) { func testMemoryGetRangeValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) { persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
} }
@ -935,7 +935,7 @@ func testMemoryGetRangeValues(t test.Tester) {
} }
func testMemoryGetBoundaryValues(t test.Tester) { func testMemoryGetBoundaryValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) { persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
} }

View file

@ -25,15 +25,19 @@ import (
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
) )
// MarshalJSON implements json.Marshaler.
func (s SamplePair) MarshalJSON() ([]byte, error) { func (s SamplePair) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp)), nil return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp)), nil
} }
// SamplePair pairs a SampleValue with a Timestamp.
type SamplePair struct { type SamplePair struct {
Value clientmodel.SampleValue Value clientmodel.SampleValue
Timestamp clientmodel.Timestamp Timestamp clientmodel.Timestamp
} }
// Equal returns true if this SamplePair and o have equal Values and equal
// Timestamps.
func (s *SamplePair) Equal(o *SamplePair) bool { func (s *SamplePair) Equal(o *SamplePair) bool {
if s == o { if s == o {
return true return true
@ -54,20 +58,27 @@ func (s *SamplePair) String() string {
return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value) return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value)
} }
// Values is a sortable slice of SamplePair pointers (as in: it implements
// sort.Interface). Sorting happens by Timestamp.
type Values []*SamplePair type Values []*SamplePair
// Len implements sort.Interface.
func (v Values) Len() int { func (v Values) Len() int {
return len(v) return len(v)
} }
// Less implements sort.Interface.
func (v Values) Less(i, j int) bool { func (v Values) Less(i, j int) bool {
return v[i].Timestamp.Before(v[j].Timestamp) return v[i].Timestamp.Before(v[j].Timestamp)
} }
// Swap implements sort.Interface.
func (v Values) Swap(i, j int) { func (v Values) Swap(i, j int) {
v[i], v[j] = v[j], v[i] v[i], v[j] = v[j], v[i]
} }
// Equal returns true if these Values are of the same length as o, and each
// value is equal to the corresponding value in o (i.e. at the same index).
func (v Values) Equal(o Values) bool { func (v Values) Equal(o Values) bool {
if len(v) != len(o) { if len(v) != len(o) {
return false return false
@ -132,6 +143,7 @@ func (v Values) dump(d *dto.SampleValueSeries) {
} }
} }
// ToSampleKey returns the SampleKey for these Values.
func (v Values) ToSampleKey(f *clientmodel.Fingerprint) *SampleKey { func (v Values) ToSampleKey(f *clientmodel.Fingerprint) *SampleKey {
return &SampleKey{ return &SampleKey{
Fingerprint: f, Fingerprint: f,
@ -156,9 +168,10 @@ func (v Values) String() string {
return buffer.String() return buffer.String()
} }
// NewValuesFromDTO deserializes Values from a DTO.
func NewValuesFromDTO(d *dto.SampleValueSeries) Values { func NewValuesFromDTO(d *dto.SampleValueSeries) Values {
// BUG(matt): Incogruent from the other load/dump API types, but much more // BUG(matt): Incogruent from the other load/dump API types, but much
// performant. // more performant.
v := make(Values, 0, len(d.Value)) v := make(Values, 0, len(d.Value))
for _, value := range d.Value { for _, value := range d.Value {
@ -171,11 +184,13 @@ func NewValuesFromDTO(d *dto.SampleValueSeries) Values {
return v return v
} }
// SampleSet is Values with a Metric attached.
type SampleSet struct { type SampleSet struct {
Metric clientmodel.Metric Metric clientmodel.Metric
Values Values Values Values
} }
// Interval describes the inclusive interval between two Timestamps.
type Interval struct { type Interval struct {
OldestInclusive clientmodel.Timestamp OldestInclusive clientmodel.Timestamp
NewestInclusive clientmodel.Timestamp NewestInclusive clientmodel.Timestamp

View file

@ -49,6 +49,8 @@ func (s *SampleKey) Constrain(first, last *SampleKey) bool {
} }
} }
// Equal returns true if this SampleKey and o have equal fingerprints,
// timestamps, and sample counts.
func (s *SampleKey) Equal(o *SampleKey) bool { func (s *SampleKey) Equal(o *SampleKey) bool {
if s == o { if s == o {
return true return true
@ -81,6 +83,11 @@ func (s *SampleKey) MayContain(t clientmodel.Timestamp) bool {
} }
} }
// Before returns true if the Fingerprint of this SampleKey is less than fp and
// false if it is greater. If both fingerprints are equal, the FirstTimestamp of
// this SampleKey is checked in the same way against t. If the timestamps are
// eqal, the LastTimestamp of this SampleKey is checked against t (and false is
// returned if they are equal again).
func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp) bool { func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp) bool {
if s.Fingerprint.Less(fp) { if s.Fingerprint.Less(fp) {
return true return true
@ -96,7 +103,7 @@ func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp)
return s.LastTimestamp.Before(t) return s.LastTimestamp.Before(t)
} }
// ToDTO converts this SampleKey into a DTO for use in serialization purposes. // Dump converts this SampleKey into a DTO for use in serialization purposes.
func (s *SampleKey) Dump(d *dto.SampleKey) { func (s *SampleKey) Dump(d *dto.SampleKey) {
d.Reset() d.Reset()
fp := &dto.Fingerprint{} fp := &dto.Fingerprint{}
@ -112,6 +119,7 @@ func (s *SampleKey) String() string {
return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount) return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount)
} }
// Load deserializes this SampleKey from a DTO.
func (s *SampleKey) Load(d *dto.SampleKey) { func (s *SampleKey) Load(d *dto.SampleKey) {
f := &clientmodel.Fingerprint{} f := &clientmodel.Fingerprint{}
loadFingerprint(f, d.GetFingerprint()) loadFingerprint(f, d.GetFingerprint())

View file

@ -422,8 +422,8 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
for i := 0; i < numberOfRangeScans; i++ { for i := 0; i < numberOfRangeScans; i++ {
timestamps := metricTimestamps[metricIndex] timestamps := metricTimestamps[metricIndex]
var first int64 = 0 var first int64
var second int64 = 0 var second int64
for { for {
firstCandidate := random.Int63n(int64(len(timestamps))) firstCandidate := random.Int63n(int64(len(timestamps)))
@ -472,6 +472,11 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
fp := &clientmodel.Fingerprint{} fp := &clientmodel.Fingerprint{}
fp.LoadFromMetric(metric) fp.LoadFromMetric(metric)
switch persistence := p.(type) { switch persistence := p.(type) {
case View:
samples = persistence.GetRangeValues(fp, interval)
if len(samples) < 2 {
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
}
case *LevelDBMetricPersistence: case *LevelDBMetricPersistence:
var err error var err error
samples, err = levelDBGetRangeValues(persistence, fp, interval) samples, err = levelDBGetRangeValues(persistence, fp, interval)
@ -482,10 +487,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples)) t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
} }
default: default:
samples = p.GetRangeValues(fp, interval) t.Error("Unexpected type of MetricPersistence.")
if len(samples) < 2 {
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
}
} }
} }
} }

View file

@ -114,10 +114,11 @@ const (
const watermarkCacheLimit = 1024 * 1024 const watermarkCacheLimit = 1024 * 1024
// NewTieredStorage returns a TieredStorage object ready to use.
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval time.Duration, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) { func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval time.Duration, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) {
if isDir, _ := utility.IsDir(rootDirectory); !isDir { if isDir, _ := utility.IsDir(rootDirectory); !isDir {
if err := os.MkdirAll(rootDirectory, 0755); err != nil { if err := os.MkdirAll(rootDirectory, 0755); err != nil {
return nil, fmt.Errorf("Could not find or create metrics directory %s: %s", rootDirectory, err) return nil, fmt.Errorf("could not find or create metrics directory %s: %s", rootDirectory, err)
} }
} }
@ -160,12 +161,12 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
return s, nil return s, nil
} }
// Enqueues Samples for storage. // AppendSamples enqueues Samples for storage.
func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) { func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
if t.state != tieredStorageServing { if t.state != tieredStorageServing {
return fmt.Errorf("Storage is not serving.") return fmt.Errorf("storage is not serving")
} }
t.memoryArena.AppendSamples(samples) t.memoryArena.AppendSamples(samples)
@ -174,7 +175,7 @@ func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) {
return return
} }
// Stops the storage subsystem, flushing all pending operations. // Drain stops the storage subsystem, flushing all pending operations.
func (t *TieredStorage) Drain(drained chan<- bool) { func (t *TieredStorage) Drain(drained chan<- bool) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
@ -193,20 +194,22 @@ func (t *TieredStorage) drain(drained chan<- bool) {
t.draining <- (drained) t.draining <- (drained)
} }
// Materializes a View according to a ViewRequestBuilder, subject to a timeout. // MakeView materializes a View according to a ViewRequestBuilder, subject to a
// timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) { func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
if t.state != tieredStorageServing { if t.state != tieredStorageServing {
return nil, fmt.Errorf("Storage is not serving") return nil, fmt.Errorf("storage is not serving")
} }
// The result channel needs a one-element buffer in case we have timed out in // The result channel needs a one-element buffer in case we have timed
// MakeView, but the view rendering still completes afterwards and writes to // out in MakeView, but the view rendering still completes afterwards
// the channel. // and writes to the channel.
result := make(chan View, 1) result := make(chan View, 1)
// The abort channel needs a one-element buffer in case the view rendering // The abort channel needs a one-element buffer in case the view
// has already exited and doesn't consume from the channel anymore. // rendering has already exited and doesn't consume from the channel
// anymore.
abortChan := make(chan bool, 1) abortChan := make(chan bool, 1)
errChan := make(chan error) errChan := make(chan error)
queryStats.GetTimer(stats.ViewQueueTime).Start() queryStats.GetTimer(stats.ViewQueueTime).Start()
@ -225,11 +228,11 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
return nil, err return nil, err
case <-time.After(deadline): case <-time.After(deadline):
abortChan <- true abortChan <- true
return nil, fmt.Errorf("MakeView timed out after %s.", deadline) return nil, fmt.Errorf("fetching query data timed out after %s", deadline)
} }
} }
// Starts serving requests. // Serve starts serving requests.
func (t *TieredStorage) Serve(started chan<- bool) { func (t *TieredStorage) Serve(started chan<- bool) {
t.mu.Lock() t.mu.Lock()
if t.state != tieredStorageStarting { if t.state != tieredStorageStarting {
@ -284,6 +287,7 @@ func (t *TieredStorage) reportQueues() {
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue))) queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue)))
} }
// Flush flushes all samples to disk.
func (t *TieredStorage) Flush() { func (t *TieredStorage) Flush() {
t.flushSema <- true t.flushSema <- true
t.flushMemory(0) t.flushMemory(0)
@ -311,6 +315,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) {
glog.Info("Done flushing.") glog.Info("Done flushing.")
} }
// Close stops serving, flushes all pending operations, and frees all resources.
func (t *TieredStorage) Close() { func (t *TieredStorage) Close() {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
@ -329,8 +334,8 @@ func (t *TieredStorage) close() {
t.memoryArena.Close() t.memoryArena.Close()
t.DiskStorage.Close() t.DiskStorage.Close()
// BUG(matt): There is a probability that pending items may hang here and not // BUG(matt): There is a probability that pending items may hang here
// get flushed. // and not get flushed.
close(t.appendToDiskQueue) close(t.appendToDiskQueue)
close(t.ViewQueue) close(t.ViewQueue)
t.wmCache.Clear() t.wmCache.Clear()
@ -639,7 +644,8 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
panic("illegal state: violated sort invariant") panic("illegal state: violated sort invariant")
} }
// Get all label values that are associated with the provided label name. // GetAllValuesForLabel gets all label values that are associated with the
// provided label name.
func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) { func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
@ -669,8 +675,8 @@ func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (c
return values, nil return values, nil
} }
// Get all of the metric fingerprints that are associated with the provided // GetFingerprintsForLabelSet gets all of the metric fingerprints that are
// label set. // associated with the provided label set.
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (clientmodel.Fingerprints, error) { func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (clientmodel.Fingerprints, error) {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
@ -700,7 +706,8 @@ func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet
return fingerprints, nil return fingerprints, nil
} }
// Get the metric associated with the provided fingerprint. // GetMetricForFingerprint gets the metric associated with the provided
// fingerprint.
func (t *TieredStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) { func (t *TieredStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()

View file

@ -27,8 +27,9 @@ var (
lastSupertime = []byte{127, 255, 255, 255, 255, 255, 255, 255} lastSupertime = []byte{127, 255, 255, 255, 255, 255, 255, 255}
) )
// Represents the summation of all datastore queries that shall be performed to // ViewRequestBuilder represents the summation of all datastore queries that
// extract values. Each operation mutates the state of the builder. // shall be performed to extract values. Each operation mutates the state of
// the builder.
type ViewRequestBuilder interface { type ViewRequestBuilder interface {
GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp)
GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration)
@ -36,12 +37,13 @@ type ViewRequestBuilder interface {
ScanJobs() scanJobs ScanJobs() scanJobs
} }
// Contains the various unoptimized requests for data. // viewRequestBuilder contains the various unoptimized requests for data.
type viewRequestBuilder struct { type viewRequestBuilder struct {
operations map[clientmodel.Fingerprint]ops operations map[clientmodel.Fingerprint]ops
} }
// Furnishes a ViewRequestBuilder for remarking what types of queries to perform. // NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what types
// of queries to perform.
func NewViewRequestBuilder() *viewRequestBuilder { func NewViewRequestBuilder() *viewRequestBuilder {
return &viewRequestBuilder{ return &viewRequestBuilder{
operations: make(map[clientmodel.Fingerprint]ops), operations: make(map[clientmodel.Fingerprint]ops),
@ -50,8 +52,8 @@ func NewViewRequestBuilder() *viewRequestBuilder {
var getValuesAtTimes = newValueAtTimeList(10 * 1024) var getValuesAtTimes = newValueAtTimeList(10 * 1024)
// Gets for the given Fingerprint either the value at that time if there is an // GetMetricAtTime gets for the given Fingerprint either the value at that time
// match or the one or two values adjacent thereto. // if there is an match or the one or two values adjacent thereto.
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) { func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) {
ops := v.operations[*fingerprint] ops := v.operations[*fingerprint]
op, _ := getValuesAtTimes.Get() op, _ := getValuesAtTimes.Get()
@ -62,9 +64,9 @@ func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprin
var getValuesAtIntervals = newValueAtIntervalList(10 * 1024) var getValuesAtIntervals = newValueAtIntervalList(10 * 1024)
// Gets for the given Fingerprint either the value at that interval from From // GetMetricAtInterval gets for the given Fingerprint either the value at that
// through Through if there is an match or the one or two values adjacent // interval from From through Through if there is an match or the one or two
// for each point. // values adjacent for each point.
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) { func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) {
ops := v.operations[*fingerprint] ops := v.operations[*fingerprint]
op, _ := getValuesAtIntervals.Get() op, _ := getValuesAtIntervals.Get()
@ -77,8 +79,8 @@ func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Finger
var getValuesAlongRanges = newValueAlongRangeList(10 * 1024) var getValuesAlongRanges = newValueAlongRangeList(10 * 1024)
// Gets for the given Fingerprint the values that occur inclusively from From // GetMetricRange gets for the given Fingerprint the values that occur
// through Through. // inclusively from From through Through.
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) { func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) {
ops := v.operations[*fingerprint] ops := v.operations[*fingerprint]
op, _ := getValuesAlongRanges.Get() op, _ := getValuesAlongRanges.Get()
@ -90,7 +92,8 @@ func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint
var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024) var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024)
// Gets value ranges at intervals for the given Fingerprint: // GetMetricRangeAtInterval gets value ranges at intervals for the given
// Fingerprint:
// //
// |----| |----| |----| |----| // |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^ // ^ ^ ^ ^ ^ ^
@ -108,7 +111,7 @@ func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.F
v.operations[*fingerprint] = ops v.operations[*fingerprint] = ops
} }
// Emits the optimized scans that will occur in the data store. This // ScanJobs emits the optimized scans that will occur in the data store. This
// effectively resets the ViewRequestBuilder back to a pristine state. // effectively resets the ViewRequestBuilder back to a pristine state.
func (v *viewRequestBuilder) ScanJobs() (j scanJobs) { func (v *viewRequestBuilder) ScanJobs() (j scanJobs) {
for fingerprint, operations := range v.operations { for fingerprint, operations := range v.operations {

View file

@ -17,8 +17,6 @@ import (
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
@ -40,27 +38,32 @@ func (w *watermarks) dump(d *dto.MetricHighWatermark) {
d.Timestamp = proto.Int64(w.High.Unix()) d.Timestamp = proto.Int64(w.High.Unix())
} }
// A FingerprintHighWatermarkMapping is used for batch updates of many high
// watermarks in a database.
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]clientmodel.Timestamp type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]clientmodel.Timestamp
// HighWatermarker models a high-watermark database.
type HighWatermarker interface { type HighWatermarker interface {
raw.Database
raw.ForEacher raw.ForEacher
raw.Pruner raw.Pruner
UpdateBatch(FingerprintHighWatermarkMapping) error UpdateBatch(FingerprintHighWatermarkMapping) error
Get(*clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error) Get(*clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error)
State() *raw.DatabaseState
Size() (uint64, bool, error)
} }
// LevelDBHighWatermarker is an implementation of HighWatermarker backed by
// leveldb.
type LevelDBHighWatermarker struct { type LevelDBHighWatermarker struct {
p *leveldb.LevelDBPersistence *leveldb.LevelDBPersistence
} }
// Get implements HighWatermarker.
func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error) { func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error) {
k := new(dto.Fingerprint) k := &dto.Fingerprint{}
dumpFingerprint(k, f) dumpFingerprint(k, f)
v := new(dto.MetricHighWatermark) v := &dto.MetricHighWatermark{}
ok, err = w.p.Get(k, v) ok, err = w.LevelDBPersistence.Get(k, v)
if err != nil { if err != nil {
return t, ok, err return t, ok, err
} }
@ -71,6 +74,7 @@ func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel.
return t, true, nil return t, true, nil
} }
// UpdateBatch implements HighWatermarker.
func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error { func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -80,9 +84,9 @@ func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
if err != nil { if err != nil {
return err return err
} }
k := new(dto.Fingerprint) k := &dto.Fingerprint{}
dumpFingerprint(k, &fp) dumpFingerprint(k, &fp)
v := new(dto.MetricHighWatermark) v := &dto.MetricHighWatermark{}
if !present { if !present {
v.Timestamp = proto.Int64(t.Unix()) v.Timestamp = proto.Int64(t.Unix())
batch.Put(k, v) batch.Put(k, v)
@ -97,36 +101,15 @@ func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
} }
} }
return w.p.Commit(batch) return w.LevelDBPersistence.Commit(batch)
}
func (w *LevelDBHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return w.p.ForEach(d, f, o)
}
func (w *LevelDBHighWatermarker) Prune() (bool, error) {
w.p.Prune()
return false, nil
}
func (w *LevelDBHighWatermarker) Close() {
w.p.Close()
}
func (w *LevelDBHighWatermarker) State() *raw.DatabaseState {
return w.p.State()
}
func (w *LevelDBHighWatermarker) Size() (uint64, bool, error) {
s, err := w.p.Size()
return s, true, err
} }
// LevelDBHighWatermarkerOptions just wraps leveldb.LevelDBOptions.
type LevelDBHighWatermarkerOptions struct { type LevelDBHighWatermarkerOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
// NewLevelDBHighWatermarker returns a LevelDBHighWatermarker ready to use.
func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) { func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
@ -134,52 +117,37 @@ func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWat
} }
return &LevelDBHighWatermarker{ return &LevelDBHighWatermarker{
p: s, LevelDBPersistence: s,
}, nil }, nil
} }
// CurationRemarker models a curation remarker database.
type CurationRemarker interface { type CurationRemarker interface {
raw.Database
raw.Pruner raw.Pruner
Update(*curationKey, clientmodel.Timestamp) error Update(*curationKey, clientmodel.Timestamp) error
Get(*curationKey) (t clientmodel.Timestamp, ok bool, err error) Get(*curationKey) (t clientmodel.Timestamp, ok bool, err error)
State() *raw.DatabaseState
Size() (uint64, bool, error)
} }
// LevelDBCurationRemarker is an implementation of CurationRemarker backed by
// leveldb.
type LevelDBCurationRemarker struct { type LevelDBCurationRemarker struct {
p *leveldb.LevelDBPersistence *leveldb.LevelDBPersistence
} }
// LevelDBCurationRemarkerOptions just wraps leveldb.LevelDBOptions.
type LevelDBCurationRemarkerOptions struct { type LevelDBCurationRemarkerOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func (w *LevelDBCurationRemarker) State() *raw.DatabaseState { // Get implements CurationRemarker.
return w.p.State()
}
func (w *LevelDBCurationRemarker) Size() (uint64, bool, error) {
s, err := w.p.Size()
return s, true, err
}
func (w *LevelDBCurationRemarker) Close() {
w.p.Close()
}
func (w *LevelDBCurationRemarker) Prune() (bool, error) {
w.p.Prune()
return false, nil
}
func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp, ok bool, err error) { func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp, ok bool, err error) {
k := new(dto.CurationKey) k := &dto.CurationKey{}
c.dump(k) c.dump(k)
v := new(dto.CurationValue) v := &dto.CurationValue{}
ok, err = w.p.Get(k, v) ok, err = w.LevelDBPersistence.Get(k, v)
if err != nil || !ok { if err != nil || !ok {
return clientmodel.TimestampFromUnix(0), ok, err return clientmodel.TimestampFromUnix(0), ok, err
} }
@ -187,15 +155,17 @@ func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp,
return clientmodel.TimestampFromUnix(v.GetLastCompletionTimestamp()), true, nil return clientmodel.TimestampFromUnix(v.GetLastCompletionTimestamp()), true, nil
} }
// Update implements CurationRemarker.
func (w *LevelDBCurationRemarker) Update(pair *curationKey, t clientmodel.Timestamp) error { func (w *LevelDBCurationRemarker) Update(pair *curationKey, t clientmodel.Timestamp) error {
k := new(dto.CurationKey) k := &dto.CurationKey{}
pair.dump(k) pair.dump(k)
return w.p.Put(k, &dto.CurationValue{ return w.LevelDBPersistence.Put(k, &dto.CurationValue{
LastCompletionTimestamp: proto.Int64(t.Unix()), LastCompletionTimestamp: proto.Int64(t.Unix()),
}) })
} }
// NewLevelDBCurationRemarker returns a LevelDBCurationRemarker ready to use.
func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) { func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil { if err != nil {
@ -203,7 +173,7 @@ func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurat
} }
return &LevelDBCurationRemarker{ return &LevelDBCurationRemarker{
p: s, LevelDBPersistence: s,
}, nil }, nil
} }

View file

@ -1,23 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package index
import "code.google.com/p/goprotobuf/proto"
type MembershipIndex interface {
Has(key proto.Message) (bool, error)
Put(key proto.Message) error
Drop(key proto.Message) error
Close()
}

View file

@ -1,23 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"github.com/prometheus/prometheus/storage/raw/index"
"testing"
)
func TestInterfaceAdherence(t *testing.T) {
var _ index.MembershipIndex = &LevelDBMembershipIndex{}
}

View file

@ -1,80 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
dto "github.com/prometheus/prometheus/model/generated"
)
var existenceValue = new(dto.MembershipIndexValue)
type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence
}
func (l *LevelDBMembershipIndex) Close() {
l.persistence.Close()
}
func (l *LevelDBMembershipIndex) Has(k proto.Message) (bool, error) {
return l.persistence.Has(k)
}
func (l *LevelDBMembershipIndex) Drop(k proto.Message) error {
return l.persistence.Drop(k)
}
func (l *LevelDBMembershipIndex) Put(k proto.Message) error {
return l.persistence.Put(k, existenceValue)
}
type LevelDBIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBMembershipIndex(o LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) {
leveldbPersistence, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LevelDBMembershipIndex{
persistence: leveldbPersistence,
}, nil
}
func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error {
return l.persistence.Commit(batch)
}
// CompactKeyspace compacts the entire database's keyspace.
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
func (l *LevelDBMembershipIndex) Prune() {
l.persistence.Prune()
}
func (l *LevelDBMembershipIndex) Size() (uint64, error) {
return l.persistence.Size()
}
func (l *LevelDBMembershipIndex) State() *raw.DatabaseState {
return l.persistence.State()
}

View file

@ -19,9 +19,25 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
// Database provides a few very basic methods to manage a database and inquire
// its state.
type Database interface {
// Close reaps all of the underlying system resources associated with
// this database. For databases that don't need that kind of clean-up,
// it is implemented as a no-op (so that clients don't need to reason
// and always call Close 'just in case').
Close() error
// State reports the state of the database as a DatabaseState object.
State() *DatabaseState
// Size returns the total size of the database in bytes. The number may
// be an approximation, depending on the underlying database type.
Size() (uint64, error)
}
// ForEacher is implemented by databases that can be iterated through.
type ForEacher interface { type ForEacher interface {
// ForEach is responsible for iterating through all records in the database // ForEach is responsible for iterating through all records in the
// until one of the following conditions are met: // database until one of the following conditions are met:
// //
// 1.) A system anomaly in the database scan. // 1.) A system anomaly in the database scan.
// 2.) The last record in the database is reached. // 2.) The last record in the database is reached.
@ -31,18 +47,21 @@ type ForEacher interface {
ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error) ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
} }
// Pruner is implemented by a database that can be pruned in some way.
type Pruner interface {
Prune()
}
// Persistence models a key-value store for bytes that supports various // Persistence models a key-value store for bytes that supports various
// additional operations. // additional operations.
type Persistence interface { type Persistence interface {
Database
ForEacher ForEacher
// Close reaps all of the underlying system resources associated with this
// persistence.
Close() error
// Has informs the user whether a given key exists in the database. // Has informs the user whether a given key exists in the database.
Has(key proto.Message) (bool, error) Has(key proto.Message) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if // Get populates 'value' with the value of 'key', if present, in which
// it is absent. // case 'present' is returned as true.
Get(key, value proto.Message) (present bool, err error) Get(key, value proto.Message) (present bool, err error)
// Drop removes the key from the database. // Drop removes the key from the database.
Drop(key proto.Message) error Drop(key proto.Message) error
@ -56,15 +75,11 @@ type Persistence interface {
// en masse. The interface implies no protocol around the atomicity of // en masse. The interface implies no protocol around the atomicity of
// effectuation. // effectuation.
type Batch interface { type Batch interface {
// Close reaps all of the underlying system resources associated with this // Close reaps all of the underlying system resources associated with
// batch mutation. // this batch mutation.
Close() Close()
// Put follows the same protocol as Persistence.Put. // Put follows the same protocol as Persistence.Put.
Put(key, value proto.Message) Put(key, value proto.Message)
// Drop follows the same protocol as Persistence.Drop. // Drop follows the same protocol as Persistence.Drop.
Drop(key proto.Message) Drop(key proto.Message)
} }
type Pruner interface {
Prune() (noop bool, err error)
}

View file

@ -26,6 +26,7 @@ type batch struct {
puts uint32 puts uint32
} }
// NewBatch returns a fully allocated batch object.
func NewBatch() *batch { func NewBatch() *batch {
return &batch{ return &batch{
batch: levigo.NewWriteBatch(), batch: levigo.NewWriteBatch(),

View file

@ -20,5 +20,5 @@ import (
) )
func TestInterfaceAdherence(t *testing.T) { func TestInterfaceAdherence(t *testing.T) {
var _ raw.Persistence = new(LevelDBPersistence) var _ raw.Persistence = &LevelDBPersistence{}
} }

View file

@ -20,6 +20,7 @@ import (
// TODO: Evaluate whether to use coding.Encoder for the key and values instead // TODO: Evaluate whether to use coding.Encoder for the key and values instead
// raw bytes for consistency reasons. // raw bytes for consistency reasons.
// Iterator provides method to iterate through a leveldb.
type Iterator interface { type Iterator interface {
Error() error Error() error
Valid() bool Valid() bool

View file

@ -24,7 +24,8 @@ import (
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
) )
// LevelDBPersistence is a disk-backed sorted key-value store. // LevelDBPersistence is a disk-backed sorted key-value store. It implements the
// interfaces raw.Database, raw.ForEacher, raw.Pruner, raw.Persistence.
type LevelDBPersistence struct { type LevelDBPersistence struct {
path string path string
name string name string
@ -43,23 +44,24 @@ type LevelDBPersistence struct {
type levigoIterator struct { type levigoIterator struct {
// iterator is the receiver of most proxied operation calls. // iterator is the receiver of most proxied operation calls.
iterator *levigo.Iterator iterator *levigo.Iterator
// readOptions is only set if the iterator is a snapshot of an underlying // readOptions is only set if the iterator is a snapshot of an
// database. This signals that it needs to be explicitly reaped upon the // underlying database. This signals that it needs to be explicitly
// end of this iterator's life. // reaped upon the end of this iterator's life.
readOptions *levigo.ReadOptions readOptions *levigo.ReadOptions
// snapshot is only set if the iterator is a snapshot of an underlying // snapshot is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the // database. This signals that it needs to be explicitly reaped upon
// end of this this iterator's life. // the end of this this iterator's life.
snapshot *levigo.Snapshot snapshot *levigo.Snapshot
// storage is only set if the iterator is a snapshot of an underlying // storage is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the // database. This signals that it needs to be explicitly reaped upon
// end of this this iterator's life. The snapshot must be freed in the // the end of this this iterator's life. The snapshot must be freed in
// context of an actual database. // the context of an actual database.
storage *levigo.DB storage *levigo.DB
// closed indicates whether the iterator has been closed before. // closed indicates whether the iterator has been closed before.
closed bool closed bool
// valid indicates whether the iterator may be used. If a LevelDB iterator // valid indicates whether the iterator may be used. If a LevelDB
// ever becomes invalid, it must be disposed of and cannot be reused. // iterator ever becomes invalid, it must be disposed of and cannot be
// reused.
valid bool valid bool
// creationTime provides the time at which the iterator was made. // creationTime provides the time at which the iterator was made.
creationTime time.Time creationTime time.Time
@ -191,13 +193,16 @@ func (i *levigoIterator) Valid() bool {
return i.valid return i.valid
} }
// Compression defines the compression mode.
type Compression uint type Compression uint
// Possible compression modes.
const ( const (
Snappy Compression = iota Snappy Compression = iota
Uncompressed Uncompressed
) )
// LevelDBOptions bundles options needed to create a LevelDBPersistence object.
type LevelDBOptions struct { type LevelDBOptions struct {
Path string Path string
Name string Name string
@ -212,6 +217,8 @@ type LevelDBOptions struct {
Compression Compression Compression Compression
} }
// NewLevelDBPersistence returns an initialized LevelDBPersistence object,
// created with the given options.
func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) { func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) {
options := levigo.NewOptions() options := levigo.NewOptions()
options.SetCreateIfMissing(true) options.SetCreateIfMissing(true)
@ -257,9 +264,10 @@ func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) {
}, nil }, nil
} }
// Close implements raw.Persistence (and raw.Database).
func (l *LevelDBPersistence) Close() error { func (l *LevelDBPersistence) Close() error {
// These are deferred to take advantage of forced closing in case of stack // These are deferred to take advantage of forced closing in case of
// unwinding due to anomalies. // stack unwinding due to anomalies.
defer func() { defer func() {
if l.filterPolicy != nil { if l.filterPolicy != nil {
l.filterPolicy.Close() l.filterPolicy.Close()
@ -299,6 +307,7 @@ func (l *LevelDBPersistence) Close() error {
return nil return nil
} }
// Get implements raw.Persistence.
func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) { func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
buf, _ := buffers.Get() buf, _ := buffers.Get()
defer buffers.Give(buf) defer buffers.Give(buf)
@ -328,10 +337,12 @@ func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
return true, nil return true, nil
} }
// Has implements raw.Persistence.
func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) { func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) {
return l.Get(k, nil) return l.Get(k, nil)
} }
// Drop implements raw.Persistence.
func (l *LevelDBPersistence) Drop(k proto.Message) error { func (l *LevelDBPersistence) Drop(k proto.Message) error {
buf, _ := buffers.Get() buf, _ := buffers.Get()
defer buffers.Give(buf) defer buffers.Give(buf)
@ -343,27 +354,29 @@ func (l *LevelDBPersistence) Drop(k proto.Message) error {
return l.storage.Delete(l.writeOptions, buf.Bytes()) return l.storage.Delete(l.writeOptions, buf.Bytes())
} }
func (l *LevelDBPersistence) Put(key, value proto.Message) error { // Put implements raw.Persistence.
func (l *LevelDBPersistence) Put(k, v proto.Message) error {
keyBuf, _ := buffers.Get() keyBuf, _ := buffers.Get()
defer buffers.Give(keyBuf) defer buffers.Give(keyBuf)
if err := keyBuf.Marshal(key); err != nil { if err := keyBuf.Marshal(k); err != nil {
panic(err) panic(err)
} }
valBuf, _ := buffers.Get() valBuf, _ := buffers.Get()
defer buffers.Give(valBuf) defer buffers.Give(valBuf)
if err := valBuf.Marshal(value); err != nil { if err := valBuf.Marshal(v); err != nil {
panic(err) panic(err)
} }
return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes()) return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes())
} }
// Commit implements raw.Persistence.
func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
// XXX: This is a wart to clean up later. Ideally, after doing extensive // XXX: This is a wart to clean up later. Ideally, after doing
// tests, we could create a Batch struct that journals pending // extensive tests, we could create a Batch struct that journals pending
// operations which the given Persistence implementation could convert // operations which the given Persistence implementation could convert
// to its specific commit requirements. // to its specific commit requirements.
batch, ok := b.(*batch) batch, ok := b.(*batch)
@ -374,7 +387,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
return l.storage.Write(l.writeOptions, batch.batch) return l.storage.Write(l.writeOptions, batch.batch)
} }
// CompactKeyspace compacts the entire database's keyspace. // Prune implements raw.Pruner. It compacts the entire keyspace of the database.
// //
// Beware that it would probably be imprudent to run this on a live user-facing // Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications. // server due to latency implications.
@ -389,6 +402,8 @@ func (l *LevelDBPersistence) Prune() {
l.storage.CompactRange(keyspace) l.storage.CompactRange(keyspace)
} }
// Size returns the approximate size the entire database takes on disk (in
// bytes). It implements the raw.Database interface.
func (l *LevelDBPersistence) Size() (uint64, error) { func (l *LevelDBPersistence) Size() (uint64, error) {
iterator, err := l.NewIterator(false) iterator, err := l.NewIterator(false)
if err != nil { if err != nil {
@ -459,6 +474,7 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) (Iterator, error) {
}, nil }, nil
} }
// ForEach implements raw.ForEacher.
func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
iterator, err := l.NewIterator(true) iterator, err := l.NewIterator(true)
if err != nil { if err != nil {
@ -482,11 +498,11 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora
} }
switch filter.Filter(decodedKey, decodedValue) { switch filter.Filter(decodedKey, decodedValue) {
case storage.STOP: case storage.Stop:
return return
case storage.SKIP: case storage.Skip:
continue continue
case storage.ACCEPT: case storage.Accept:
opErr := operator.Operate(decodedKey, decodedValue) opErr := operator.Operate(decodedKey, decodedValue)
if opErr != nil { if opErr != nil {
if opErr.Continuable { if opErr.Continuable {

View file

@ -23,6 +23,11 @@ const (
sstablesKey = "leveldb.sstables" sstablesKey = "leveldb.sstables"
) )
// State returns the DatabaseState. It implements the raw.Database interface and
// sets the following Supplemental entries:
// "Low Level": leveldb property value for "leveldb.stats"
// "SSTable": leveldb property value for "leveldb.sstables"
// "Errors": only set if an error has occurred determining the size
func (l *LevelDBPersistence) State() *raw.DatabaseState { func (l *LevelDBPersistence) State() *raw.DatabaseState {
databaseState := &raw.DatabaseState{ databaseState := &raw.DatabaseState{
Location: l.path, Location: l.path,

View file

@ -23,8 +23,8 @@ import (
const cacheCapacity = 0 const cacheCapacity = 0
type ( type (
// Pair models a prospective (key, value) double that will be committed to // Pair models a prospective (key, value) double that will be committed
// a database. // to a database.
Pair interface { Pair interface {
Get() (key, value proto.Message) Get() (key, value proto.Message)
} }
@ -32,17 +32,18 @@ type (
// Pairs models a list of Pair for disk committing. // Pairs models a list of Pair for disk committing.
Pairs []Pair Pairs []Pair
// Preparer readies a LevelDB store for a given raw state given the fixtures // Preparer readies a LevelDB store for a given raw state given the
// definitions passed into it. // fixtures definitions passed into it.
Preparer interface { Preparer interface {
// Prepare furnishes the database and returns its path along with any // Prepare furnishes the database and returns its path along
// encountered anomalies. // with any encountered anomalies.
Prepare(namespace string, f FixtureFactory) test.TemporaryDirectory Prepare(namespace string, f FixtureFactory) test.TemporaryDirectory
} }
// FixtureFactory is an iterator emitting fixture data.
FixtureFactory interface { FixtureFactory interface {
// HasNext indicates whether the FixtureFactory has more pending fixture // HasNext indicates whether the FixtureFactory has more pending
// data to build. // fixture data to build.
HasNext() (has bool) HasNext() (has bool)
// Next emits the next (key, value) double for storage. // Next emits the next (key, value) double for storage.
Next() (key, value proto.Message) Next() (key, value proto.Message)
@ -85,10 +86,12 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory
return return
} }
// HasNext implements FixtureFactory.
func (f cassetteFactory) HasNext() bool { func (f cassetteFactory) HasNext() bool {
return f.index < f.count return f.index < f.count
} }
// Next implements FixtureFactory.
func (f *cassetteFactory) Next() (key, value proto.Message) { func (f *cassetteFactory) Next() (key, value proto.Message) {
key, value = f.pairs[f.index].Get() key, value = f.pairs[f.index].Get()

View file

@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
// DatabaseState contains some fundamental attributes of a database.
type DatabaseState struct { type DatabaseState struct {
Name string Name string
@ -28,12 +29,17 @@ type DatabaseState struct {
Supplemental map[string]string Supplemental map[string]string
} }
// DatabaseStates is a sortable slice of DatabaseState pointers. It implements
// sort.Interface.
type DatabaseStates []*DatabaseState type DatabaseStates []*DatabaseState
// Len implements sort.Interface.
func (s DatabaseStates) Len() int { func (s DatabaseStates) Len() int {
return len(s) return len(s)
} }
// Less implements sort.Interface. The primary sorting criterion is the Name,
// the secondary criterion is the Size.
func (s DatabaseStates) Less(i, j int) bool { func (s DatabaseStates) Less(i, j int) bool {
l := s[i] l := s[i]
r := s[j] r := s[j]
@ -48,6 +54,7 @@ func (s DatabaseStates) Less(i, j int) bool {
return l.Size < r.Size return l.Size < r.Size
} }
// Swap implements sort.Interface.
func (s DatabaseStates) Swap(i, j int) { func (s DatabaseStates) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }

View file

@ -17,7 +17,7 @@ import (
const ( const (
putEndpoint = "/api/put" putEndpoint = "/api/put"
contentTypeJson = "application/json" contentTypeJSON = "application/json"
) )
var ( var (
@ -30,7 +30,7 @@ type Client struct {
httpClient *http.Client httpClient *http.Client
} }
// Create a new Client. // NewClient creates a new Client.
func NewClient(url string, timeout time.Duration) *Client { func NewClient(url string, timeout time.Duration) *Client {
return &Client{ return &Client{
url: url, url: url,
@ -47,12 +47,13 @@ type StoreSamplesRequest struct {
Tags map[string]string `json:"tags"` Tags map[string]string `json:"tags"`
} }
// Escape Prometheus label values to valid tag values for OpenTSDB. // escapeTagValue escapes Prometheus label values to valid tag values for
// OpenTSDB.
func escapeTagValue(l clientmodel.LabelValue) string { func escapeTagValue(l clientmodel.LabelValue) string {
return illegalCharsRE.ReplaceAllString(string(l), "_") return illegalCharsRE.ReplaceAllString(string(l), "_")
} }
// Translate Prometheus metric into OpenTSDB tags. // tagsFromMetric translates Prometheus metric into OpenTSDB tags.
func tagsFromMetric(m clientmodel.Metric) map[string]string { func tagsFromMetric(m clientmodel.Metric) map[string]string {
tags := make(map[string]string, len(m)-1) tags := make(map[string]string, len(m)-1)
for l, v := range m { for l, v := range m {
@ -64,7 +65,7 @@ func tagsFromMetric(m clientmodel.Metric) map[string]string {
return tags return tags
} }
// Send a batch of samples to OpenTSDB via its HTTP API. // Store sends a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Store(samples clientmodel.Samples) error { func (c *Client) Store(samples clientmodel.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples)) reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples { for _, s := range samples {
@ -91,7 +92,7 @@ func (c *Client) Store(samples clientmodel.Samples) error {
resp, err := c.httpClient.Post( resp, err := c.httpClient.Post(
u.String(), u.String(),
contentTypeJson, contentTypeJSON,
bytes.NewBuffer(buf), bytes.NewBuffer(buf),
) )
if err != nil { if err != nil {
@ -116,5 +117,5 @@ func (c *Client) Store(samples clientmodel.Samples) error {
if err := json.Unmarshal(buf, &r); err != nil { if err := json.Unmarshal(buf, &r); err != nil {
return err return err
} }
return fmt.Errorf("Failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
} }

View file

@ -47,7 +47,7 @@ type TSDBQueueManager struct {
drained chan bool drained chan bool
} }
// Build a new TSDBQueueManager. // NewTSDBQueueManager builds a new TSDBQueueManager.
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
return &TSDBQueueManager{ return &TSDBQueueManager{
tsdb: tsdb, tsdb: tsdb,
@ -57,8 +57,8 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
} }
} }
// Queue a sample batch to be sent to the TSDB. This drops the most recently // Queue queues a sample batch to be sent to the TSDB. It drops the most
// queued samples on the floor if the queue is full. // recently queued samples on the floor if the queue is full.
func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
select { select {
case t.queue <- s: case t.queue <- s:
@ -85,13 +85,13 @@ func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
} }
} }
// Report notification queue occupancy and capacity. // reportQueues reports notification queue occupancy and capacity.
func (t *TSDBQueueManager) reportQueues() { func (t *TSDBQueueManager) reportQueues() {
queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue))) queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue)))
queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue))) queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue)))
} }
// Continuously send samples to the TSDB. // Run continuously sends samples to the TSDB.
func (t *TSDBQueueManager) Run() { func (t *TSDBQueueManager) Run() {
defer func() { defer func() {
close(t.drained) close(t.drained)
@ -129,7 +129,7 @@ func (t *TSDBQueueManager) Run() {
} }
} }
// Flush remaining queued samples. // Flush flushes remaining queued samples.
func (t *TSDBQueueManager) flush() { func (t *TSDBQueueManager) flush() {
if len(t.pendingSamples) > 0 { if len(t.pendingSamples) > 0 {
go t.sendSamples(t.pendingSamples) go t.sendSamples(t.pendingSamples)
@ -137,7 +137,8 @@ func (t *TSDBQueueManager) flush() {
t.pendingSamples = t.pendingSamples[:0] t.pendingSamples = t.pendingSamples[:0]
} }
// Stop sending samples to the TSDB and wait for pending sends to complete. // Close stops sending samples to the TSDB and waits for pending sends to
// complete.
func (t *TSDBQueueManager) Close() { func (t *TSDBQueueManager) Close() {
glog.Infof("TSDB queue manager shutting down...") glog.Infof("TSDB queue manager shutting down...")
close(t.queue) close(t.queue)