Major code cleanup in storage.

- Mostly docstring fixed/additions.
  (Please review these carefully, since most of them were missing, I
  had to guess them from an outsider's perspective. (Which on the
  other hand proves how desperately required many of these docstrings
  are.))

- Removed all uses of new(...) to meet our own style guide (draft).

- Fixed all other 'go vet' and 'golint' issues (except those that are
  not fixable (i.e. caused by bugs in or by design of 'go vet' and
  'golint')).

- Some trivial refactorings, like reorder functions, minor renames, ...

- Some slightly less trivial refactoring, mostly to reduce code
  duplication by embedding types instead of writing many explicit
  forwarders.

- Cleaned up the interface structure a bit. (Most significant probably
  the removal of the View-like methods from MetricPersistenc. Now they
  are only in View and not duplicated anymore.)

- Removed dead code. (Probably not all of it, but it's a first
  step...)

- Fixed a leftover in storage/metric/end_to_end_test.go (that made
  some parts of the code never execute (incidentally, those parts
  were broken (and I fixed them, too))).

Change-Id: Ibcac069940d118a88f783314f5b4595dce6641d5
This commit is contained in:
Bjoern Rabenstein 2014-02-14 19:36:27 +01:00
parent 688f4f43c3
commit 6bc083f38b
33 changed files with 602 additions and 670 deletions

View file

@ -27,34 +27,34 @@ type FilterResult int
const (
// Stop scanning the database.
STOP FilterResult = iota
Stop FilterResult = iota
// Skip this record but continue scanning.
SKIP
Skip
// Accept this record for the Operator.
ACCEPT
Accept
)
func (f FilterResult) String() string {
switch f {
case STOP:
case Stop:
return "STOP"
case SKIP:
case Skip:
return "SKIP"
case ACCEPT:
case Accept:
return "ACCEPT"
}
panic("unknown")
}
type OperatorErrorType int
// OperatorError is used for storage operations upon errors that may or may not
// be continuable.
type OperatorError struct {
Error error
Continuable bool
}
// Filter is responsible for controlling the behavior of the database scan
// RecordFilter is responsible for controlling the behavior of the database scan
// process and determines the disposition of various records.
//
// The protocol around it makes the assumption that the underlying

View file

@ -60,7 +60,7 @@ func (c *compactionChecker) Operate(key, value interface{}) *storage.OperatorErr
if sampleKey.FirstTimestamp.After(sampleKey.LastTimestamp) {
c.t.Fatalf("Chunk FirstTimestamp (%v) is after LastTimestamp (%v): %v", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sampleKey)
}
fp := new(clientmodel.Fingerprint)
fp := &clientmodel.Fingerprint{}
for _, sample := range value.(Values) {
if sample.Timestamp.Before(sampleKey.FirstTimestamp) || sample.Timestamp.After(sampleKey.LastTimestamp) {
c.t.Fatalf("Sample not within chunk boundaries: chunk FirstTimestamp (%v), chunk LastTimestamp (%v) vs. sample Timestamp (%v)", sampleKey.FirstTimestamp.Unix(), sampleKey.LastTimestamp.Unix(), sample.Timestamp)

View file

@ -34,7 +34,7 @@ import (
const curationYieldPeriod = 250 * time.Millisecond
var errIllegalIterator = errors.New("Iterator invalid.")
var errIllegalIterator = errors.New("iterator invalid")
// CurationStateUpdater receives updates about the curation state.
type CurationStateUpdater interface {
@ -50,6 +50,7 @@ type CurationState struct {
Fingerprint *clientmodel.Fingerprint
}
// CuratorOptions bundles the parameters needed to create a Curator.
type CuratorOptions struct {
// Stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain
@ -59,7 +60,7 @@ type CuratorOptions struct {
ViewQueue chan viewJob
}
// curator is responsible for effectuating a given curation policy across the
// Curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
type Curator struct {
@ -71,6 +72,7 @@ type Curator struct {
sampleKeys *sampleKeyList
}
// NewCurator returns an initialized Curator.
func NewCurator(o *CuratorOptions) *Curator {
return &Curator{
stop: o.Stop,
@ -122,7 +124,7 @@ type watermarkScanner struct {
sampleKeys *sampleKeyList
}
// run facilitates the curation lifecycle.
// Run facilitates the curation lifecycle.
//
// recencyThreshold represents the most recent time up to which values will be
// curated.
@ -214,7 +216,7 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times
return
}
// drain instructs the curator to stop at the next convenient moment as to not
// Drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies.
func (c *Curator) Drain() {
if len(c.stop) == 0 {
@ -222,34 +224,35 @@ func (c *Curator) Drain() {
}
}
// Close needs to be called to cleanly dispose of a curator.
func (c *Curator) Close() {
c.dtoSampleKeys.Close()
c.sampleKeys.Close()
}
func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) {
key := new(dto.Fingerprint)
key := &dto.Fingerprint{}
bytes := in.([]byte)
if err := proto.Unmarshal(bytes, key); err != nil {
return nil, err
}
fingerprint := new(clientmodel.Fingerprint)
fingerprint := &clientmodel.Fingerprint{}
loadFingerprint(fingerprint, key)
return fingerprint, nil
}
func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) {
value := new(dto.MetricHighWatermark)
value := &dto.MetricHighWatermark{}
bytes := in.([]byte)
if err := proto.Unmarshal(bytes, value); err != nil {
return nil, err
}
watermark := new(watermarks)
watermark := &watermarks{}
watermark.load(value)
return watermark, nil
@ -280,7 +283,7 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
}()
if w.shouldStop() {
return storage.STOP
return storage.Stop
}
k := &curationKey{
@ -295,24 +298,24 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
return
}
if !present {
return storage.ACCEPT
return storage.Accept
}
if !curationRemark.Before(w.stopAt) {
return storage.SKIP
return storage.Skip
}
watermark := value.(*watermarks)
if !curationRemark.Before(watermark.High) {
return storage.SKIP
return storage.Skip
}
curationConsistent, err := w.curationConsistent(fingerprint, watermark)
if err != nil {
return
}
if curationConsistent {
return storage.SKIP
return storage.Skip
}
return storage.ACCEPT
return storage.Accept
}
// curationConsistent determines whether the given metric is in a dirty state

View file

@ -272,8 +272,9 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
}
}
if true {
// XXX: Purely a benchmark.
v, ok := p.(View)
if !ok {
// It's purely a benchmark for a MetricPersistence that is not viewable.
return
}
@ -294,7 +295,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
}
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
samples := p.GetValueAtTime(fingerprints[0], time)
samples := v.GetValueAtTime(fingerprints[0], time)
if len(samples) == 0 {
t.Fatal("expected at least one sample.")
}
@ -303,7 +304,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
for _, sample := range samples {
if sample.Value != expected {
t.Fatalf("expected %d value, got %d", expected, sample.Value)
t.Fatalf("expected %v value, got %v", expected, sample.Value)
}
}
}
@ -334,8 +335,9 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
p.AppendSamples(s)
if true {
// XXX: Purely a benchmark.
v, ok := p.(View)
if !ok {
// It's purely a benchmark for a MetricPersistance that is not viewable.
return
}
@ -356,7 +358,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
}
time := clientmodel.Timestamp(0).Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
samples := p.GetValueAtTime(fingerprints[0], time)
samples := v.GetValueAtTime(fingerprints[0], time)
if len(samples) == 0 {
t.Fatal("expected at least one sample.")
}
@ -365,7 +367,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
for _, sample := range samples {
if sample.Value != expected {
t.Fatalf("expected %d value, got %d", expected, sample.Value)
t.Fatalf("expected %v value, got %v", expected, sample.Value)
}
}
}

View file

@ -34,7 +34,7 @@ func (l *dtoSampleKeyList) Get() (*dto.SampleKey, bool) {
return v.(*dto.SampleKey), ok
}
return new(dto.SampleKey), false
return &dto.SampleKey{}, false
}
func (l *dtoSampleKeyList) Give(v *dto.SampleKey) bool {
@ -51,7 +51,7 @@ type sampleKeyList struct {
l utility.FreeList
}
var defaultSampleKey = new(SampleKey)
var defaultSampleKey = &SampleKey{}
func newSampleKeyList(cap int) *sampleKeyList {
return &sampleKeyList{
@ -64,7 +64,7 @@ func (l *sampleKeyList) Get() (*SampleKey, bool) {
return v.(*SampleKey), ok
}
return new(SampleKey), false
return &SampleKey{}, false
}
func (l *sampleKeyList) Give(v *SampleKey) bool {
@ -86,10 +86,10 @@ func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) {
return v.(*getValuesAtTimeOp), ok
}
return new(getValuesAtTimeOp), false
return &getValuesAtTimeOp{}, false
}
var pGetValuesAtTimeOp = new(getValuesAtTimeOp)
var pGetValuesAtTimeOp = &getValuesAtTimeOp{}
func (l *valueAtTimeList) Give(v *getValuesAtTimeOp) bool {
*v = *pGetValuesAtTimeOp
@ -112,10 +112,10 @@ func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) {
return v.(*getValuesAtIntervalOp), ok
}
return new(getValuesAtIntervalOp), false
return &getValuesAtIntervalOp{}, false
}
var pGetValuesAtIntervalOp = new(getValuesAtIntervalOp)
var pGetValuesAtIntervalOp = &getValuesAtIntervalOp{}
func (l *valueAtIntervalList) Give(v *getValuesAtIntervalOp) bool {
*v = *pGetValuesAtIntervalOp
@ -138,10 +138,10 @@ func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) {
return v.(*getValuesAlongRangeOp), ok
}
return new(getValuesAlongRangeOp), false
return &getValuesAlongRangeOp{}, false
}
var pGetValuesAlongRangeOp = new(getValuesAlongRangeOp)
var pGetValuesAlongRangeOp = &getValuesAlongRangeOp{}
func (l *valueAlongRangeList) Give(v *getValuesAlongRangeOp) bool {
*v = *pGetValuesAlongRangeOp
@ -164,10 +164,10 @@ func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool)
return v.(*getValueRangeAtIntervalOp), ok
}
return new(getValueRangeAtIntervalOp), false
return &getValueRangeAtIntervalOp{}, false
}
var pGetValueRangeAtIntervalOp = new(getValueRangeAtIntervalOp)
var pGetValueRangeAtIntervalOp = &getValueRangeAtIntervalOp{}
func (l *valueAtIntervalAlongRangeList) Give(v *getValueRangeAtIntervalOp) bool {
*v = *pGetValueRangeAtIntervalOp

View file

@ -21,8 +21,6 @@ import (
"code.google.com/p/goprotobuf/proto"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
@ -30,59 +28,52 @@ import (
dto "github.com/prometheus/prometheus/model/generated"
)
// FingerprintMetricMapping is an in-memory map of Fingerprints to Metrics.
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
// FingerprintMetricIndex models a database mapping Fingerprints to Metrics.
type FingerprintMetricIndex interface {
raw.Database
raw.Pruner
IndexBatch(FingerprintMetricMapping) error
Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
// LevelDBFingerprintMetricIndex implements FingerprintMetricIndex using
// leveldb.
type LevelDBFingerprintMetricIndex struct {
p *leveldb.LevelDBPersistence
*leveldb.LevelDBPersistence
}
// LevelDBFingerprintMetricIndexOptions just wraps leveldb.LevelDBOptions.
type LevelDBFingerprintMetricIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *LevelDBFingerprintMetricIndex) Close() {
i.p.Close()
}
func (i *LevelDBFingerprintMetricIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LevelDBFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
// IndexBatch implements FingerprintMetricIndex.
func (i *LevelDBFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch()
defer b.Close()
for f, m := range mapping {
k := new(dto.Fingerprint)
k := &dto.Fingerprint{}
dumpFingerprint(k, &f)
v := new(dto.Metric)
v := &dto.Metric{}
dumpMetric(v, m)
b.Put(k, v)
}
return i.p.Commit(b)
return i.LevelDBPersistence.Commit(b)
}
// Lookup implements FingerprintMetricIndex.
func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
k := new(dto.Fingerprint)
k := &dto.Fingerprint{}
dumpFingerprint(k, f)
v := new(dto.Metric)
if ok, err := i.p.Get(k, v); !ok {
v := &dto.Metric{}
if ok, err := i.LevelDBPersistence.Get(k, v); !ok {
return nil, false, nil
} else if err != nil {
return nil, false, err
@ -97,12 +88,8 @@ func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m cl
return m, true, nil
}
func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
// NewLevelDBFingerprintMetricIndex returns a LevelDBFingerprintMetricIndex
// object ready to use.
func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
@ -110,26 +97,32 @@ func NewLevelDBFingerprintMetricIndex(o LevelDBFingerprintMetricIndexOptions) (*
}
return &LevelDBFingerprintMetricIndex{
p: s,
LevelDBPersistence: s,
}, nil
}
// LabelNameFingerprintMapping is an in-memory map of LabelNames to
// Fingerprints.
type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints
// LabelNameFingerprintIndex models a database mapping LabelNames to
// Fingerprints.
type LabelNameFingerprintIndex interface {
raw.Database
raw.Pruner
IndexBatch(LabelNameFingerprintMapping) error
Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error)
Has(clientmodel.LabelName) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
// LevelDBLabelNameFingerprintIndex implements LabelNameFingerprintIndex using
// leveldb.
type LevelDBLabelNameFingerprintIndex struct {
p *leveldb.LevelDBPersistence
*leveldb.LevelDBPersistence
}
// IndexBatch implements LabelNameFingerprintIndex.
func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -140,9 +133,9 @@ func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp
key := &dto.LabelName{
Name: proto.String(string(labelName)),
}
value := new(dto.FingerprintCollection)
value := &dto.FingerprintCollection{}
for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint)
f := &dto.Fingerprint{}
dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f)
}
@ -150,14 +143,15 @@ func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp
batch.Put(key, value)
}
return i.p.Commit(batch)
return i.LevelDBPersistence.Commit(batch)
}
// Lookup implements LabelNameFingerprintIndex.
func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
k := new(dto.LabelName)
k := &dto.LabelName{}
dumpLabelName(k, l)
v := new(dto.FingerprintCollection)
ok, err = i.p.Get(k, v)
v := &dto.FingerprintCollection{}
ok, err = i.LevelDBPersistence.Get(k, v)
if err != nil {
return nil, false, err
}
@ -166,7 +160,7 @@ func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps
}
for _, m := range v.Member {
fp := new(clientmodel.Fingerprint)
fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, m)
fps = append(fps, fp)
}
@ -174,35 +168,20 @@ func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps
return fps, true, nil
}
// Has implements LabelNameFingerprintIndex.
func (i *LevelDBLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.p.Has(&dto.LabelName{
return i.LevelDBPersistence.Has(&dto.LabelName{
Name: proto.String(string(l)),
})
}
func (i *LevelDBLabelNameFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LevelDBLabelNameFingerprintIndex) Close() {
i.p.Close()
}
func (i *LevelDBLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBLabelNameFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
}
// LevelDBLabelNameFingerprintIndexOptions just wraps leveldb.LevelDBOptions.
type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
// NewLevelLabelNameFingerprintIndex returns a LevelDBLabelNameFingerprintIndex
// ready to use.
func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
@ -210,31 +189,38 @@ func NewLevelLabelNameFingerprintIndex(o LevelDBLabelNameFingerprintIndexOptions
}
return &LevelDBLabelNameFingerprintIndex{
p: s,
LevelDBPersistence: s,
}, nil
}
// LabelPairFingerprintMapping is an in-memory map of LabelPairs to
// Fingerprints.
type LabelPairFingerprintMapping map[LabelPair]clientmodel.Fingerprints
// LabelPairFingerprintIndex models a database mapping LabelPairs to
// Fingerprints.
type LabelPairFingerprintIndex interface {
raw.Database
raw.ForEacher
raw.Pruner
IndexBatch(LabelPairFingerprintMapping) error
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
// LevelDBLabelPairFingerprintIndex implements LabelPairFingerprintIndex using
// leveldb.
type LevelDBLabelPairFingerprintIndex struct {
p *leveldb.LevelDBPersistence
*leveldb.LevelDBPersistence
}
// LevelDBLabelSetFingerprintIndexOptions just wraps leveldb.LevelDBOptions.
type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
// IndexBatch implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -246,9 +232,9 @@ func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapp
Name: proto.String(string(pair.Name)),
Value: proto.String(string(pair.Value)),
}
value := new(dto.FingerprintCollection)
value := &dto.FingerprintCollection{}
for _, fp := range fps {
f := new(dto.Fingerprint)
f := &dto.Fingerprint{}
dumpFingerprint(f, fp)
value.Member = append(value.Member, f)
}
@ -256,17 +242,18 @@ func (i *LevelDBLabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintMapp
batch.Put(key, value)
}
return i.p.Commit(batch)
return i.LevelDBPersistence.Commit(batch)
}
// Lookup implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
}
v := new(dto.FingerprintCollection)
v := &dto.FingerprintCollection{}
ok, err = i.p.Get(k, v)
ok, err = i.LevelDBPersistence.Get(k, v)
if !ok {
return nil, false, nil
@ -276,7 +263,7 @@ func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.F
}
for _, pair := range v.Member {
fp := new(clientmodel.Fingerprint)
fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, pair)
m = append(m, fp)
}
@ -284,37 +271,18 @@ func (i *LevelDBLabelPairFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.F
return m, true, nil
}
// Has implements LabelPairFingerprintMapping.
func (i *LevelDBLabelPairFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
}
return i.p.Has(k)
}
func (i *LevelDBLabelPairFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *LevelDBLabelPairFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LevelDBLabelPairFingerprintIndex) Close() {
i.p.Close()
}
func (i *LevelDBLabelPairFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBLabelPairFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
return i.LevelDBPersistence.Has(k)
}
// NewLevelDBLabelSetFingerprintIndex returns a LevelDBLabelPairFingerprintIndex
// object ready to use.
func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelPairFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
@ -322,68 +290,55 @@ func NewLevelDBLabelSetFingerprintIndex(o LevelDBLabelSetFingerprintIndexOptions
}
return &LevelDBLabelPairFingerprintIndex{
p: s,
LevelDBPersistence: s,
}, nil
}
// MetricMembershipIndex models a database tracking the existence of Metrics.
type MetricMembershipIndex interface {
raw.Database
raw.Pruner
IndexBatch(FingerprintMetricMapping) error
Has(clientmodel.Metric) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
// LevelDBMetricMembershipIndex implements MetricMembershipIndex using leveldb.
type LevelDBMetricMembershipIndex struct {
p *leveldb.LevelDBPersistence
*leveldb.LevelDBPersistence
}
var existenceIdentity = new(dto.MembershipIndexValue)
var existenceIdentity = &dto.MembershipIndexValue{}
// IndexBatch implements MetricMembershipIndex.
func (i *LevelDBMetricMembershipIndex) IndexBatch(b FingerprintMetricMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for _, m := range b {
k := new(dto.Metric)
k := &dto.Metric{}
dumpMetric(k, m)
batch.Put(k, existenceIdentity)
}
return i.p.Commit(batch)
return i.LevelDBPersistence.Commit(batch)
}
// Has implements MetricMembershipIndex.
func (i *LevelDBMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
k := new(dto.Metric)
k := &dto.Metric{}
dumpMetric(k, m)
return i.p.Has(k)
}
func (i *LevelDBMetricMembershipIndex) Close() {
i.p.Close()
}
func (i *LevelDBMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBMetricMembershipIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LevelDBMetricMembershipIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
return i.LevelDBPersistence.Has(k)
}
// LevelDBMetricMembershipIndexOptions just wraps leveldb.LevelDBOptions
type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions
}
// NewLevelDBMetricMembershipIndex returns a LevelDBMetricMembershipIndex object
// ready to use.
func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
@ -391,7 +346,7 @@ func NewLevelDBMetricMembershipIndex(o LevelDBMetricMembershipIndexOptions) (*Le
}
return &LevelDBMetricMembershipIndex{
p: s,
LevelDBPersistence: s,
}, nil
}
@ -402,7 +357,7 @@ type MetricIndexer interface {
IndexMetrics(FingerprintMetricMapping) error
}
// IndexObserver listens and receives changes to a given
// IndexerObserver listens and receives changes to a given
// FingerprintMetricMapping.
type IndexerObserver interface {
Observe(FingerprintMetricMapping) error
@ -422,6 +377,8 @@ type IndexerProxy struct {
observers []IndexerObserver
}
// IndexMetrics proxies the given FingerprintMetricMapping to the underlying
// MetricIndexer and calls all registered observers with it.
func (p *IndexerProxy) IndexMetrics(b FingerprintMetricMapping) error {
if p.err != nil {
return p.err
@ -451,7 +408,7 @@ func (p *IndexerProxy) Close() error {
return nil
}
// Close flushes the underlying index requests before closing.
// Flush flushes the underlying index requests before closing.
func (p *IndexerProxy) Flush() error {
if p.err != nil {
return p.err
@ -477,6 +434,8 @@ type SynchronizedIndexer struct {
i MetricIndexer
}
// IndexMetrics calls IndexMetrics of the wrapped MetricIndexer after acquiring
// a lock.
func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
i.mu.Lock()
defer i.mu.Unlock()
@ -488,6 +447,8 @@ type flusher interface {
Flush() error
}
// Flush calls Flush of the wrapped MetricIndexer after acquiring a lock. If the
// wrapped MetricIndexer has no Flush method, this is a no-op.
func (i *SynchronizedIndexer) Flush() error {
if flusher, ok := i.i.(flusher); ok {
i.mu.Lock()
@ -499,6 +460,8 @@ func (i *SynchronizedIndexer) Flush() error {
return nil
}
// Close calls Close of the wrapped MetricIndexer after acquiring a lock. If the
// wrapped MetricIndexer has no Close method, this is a no-op.
func (i *SynchronizedIndexer) Close() error {
if closer, ok := i.i.(io.Closer); ok {
i.mu.Lock()
@ -510,7 +473,8 @@ func (i *SynchronizedIndexer) Close() error {
return nil
}
// NewSynchronizedIndexer builds a new MetricIndexer.
// NewSynchronizedIndexer returns a SynchronizedIndexer wrapping the given
// MetricIndexer.
func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer {
return &SynchronizedIndexer{
i: i,
@ -531,6 +495,8 @@ type BufferedIndexer struct {
err error
}
// IndexMetrics writes the entries in the given FingerprintMetricMapping to the
// index.
func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
if i.err != nil {
return i.err
@ -542,8 +508,6 @@ func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
return nil
}
i.buf = append(i.buf)
i.err = i.Flush()
return i.err
@ -590,8 +554,8 @@ func (i *BufferedIndexer) Close() error {
return nil
}
// NewBufferedIndexer returns a BufferedIndexer ready to use.
func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer {
return &BufferedIndexer{
i: i,
limit: limit,
@ -715,6 +679,8 @@ func extendLabelPairIndex(i LabelPairFingerprintIndex, b FingerprintMetricMappin
return batch, nil
}
// IndexMetrics adds the facets of all unindexed metrics found in the given
// FingerprintMetricMapping to the corresponding indices.
func (i *TotalIndexer) IndexMetrics(b FingerprintMetricMapping) error {
unindexed, err := findUnindexed(i.MetricMembership, b)
if err != nil {

View file

@ -13,11 +13,7 @@
package metric
import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
)
import clientmodel "github.com/prometheus/client_golang/model"
// AppendBatch models a batch of samples to be stored.
type AppendBatch map[clientmodel.Fingerprint]SampleSet
@ -32,24 +28,17 @@ type MetricPersistence interface {
// Record a group of new samples in the storage layer.
AppendSamples(clientmodel.Samples) error
// Get all of the metric fingerprints that are associated with the provided
// label set.
// Get all of the metric fingerprints that are associated with the
// provided label set.
GetFingerprintsForLabelSet(clientmodel.LabelSet) (clientmodel.Fingerprints, error)
// Get all of the metric fingerprints that are associated for a given label
// name.
// Get all of the metric fingerprints that are associated for a given
// label name.
GetFingerprintsForLabelName(clientmodel.LabelName) (clientmodel.Fingerprints, error)
// Get the metric associated with the provided fingerprint.
GetMetricForFingerprint(*clientmodel.Fingerprint) (clientmodel.Metric, error)
// Get the two metric values that are immediately adjacent to a given time.
GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values
// Get the boundary values of an interval: the first value older than the
// interval start, and the first value younger than the interval end.
GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values
// Get all values contained within a provided interval.
GetRangeValues(*clientmodel.Fingerprint, Interval) Values
// Get all label values that are associated with a given label name.
GetAllValuesForLabel(clientmodel.LabelName) (clientmodel.LabelValues, error)
}
@ -57,19 +46,19 @@ type MetricPersistence interface {
// View provides a view of the values in the datastore subject to the request
// of a preloading operation.
type View interface {
// Get the two values that are immediately adjacent to a given time.
GetValueAtTime(*clientmodel.Fingerprint, clientmodel.Timestamp) Values
// Get the boundary values of an interval: the first value older than
// the interval start, and the first value younger than the interval
// end.
GetBoundaryValues(*clientmodel.Fingerprint, Interval) Values
// Get all values contained within a provided interval.
GetRangeValues(*clientmodel.Fingerprint, Interval) Values
// Destroy this view.
Close()
}
type Series interface {
Fingerprint() *clientmodel.Fingerprint
Metric() clientmodel.Metric
}
type IteratorsForFingerprintBuilder interface {
ForStream(stream *stream) (storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator)
// ViewableMetricPersistence is a MetricPersistence that is able to present the
// samples it has stored as a View.
type ViewableMetricPersistence interface {
MetricPersistence
View
}

View file

@ -17,11 +17,14 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
)
// LabelPair pairs a name with a value.
type LabelPair struct {
Name clientmodel.LabelName
Value clientmodel.LabelValue
}
// Equal returns true iff both the Name and the Value of this LabelPair and o
// are equal.
func (l *LabelPair) Equal(o *LabelPair) bool {
switch {
case l.Name != o.Name:
@ -33,6 +36,8 @@ func (l *LabelPair) Equal(o *LabelPair) bool {
}
}
// LabelPairs is a sortable slice of LabelPair pointers. It implements
// sort.Interface.
type LabelPairs []*LabelPair
func (l LabelPairs) Len() int {

View file

@ -35,11 +35,12 @@ import (
const sortConcurrency = 2
// LevelDBMetricPersistence is a leveldb-backed persistence layer for metrics.
type LevelDBMetricPersistence struct {
CurationRemarks CurationRemarker
FingerprintToMetrics FingerprintMetricIndex
LabelNameToFingerprints LabelNameFingerprintIndex
LabelSetToFingerprints LabelPairFingerprintIndex
LabelPairToFingerprints LabelPairFingerprintIndex
MetricHighWatermarks HighWatermarker
MetricMembershipIndex MetricMembershipIndex
@ -63,8 +64,8 @@ type LevelDBMetricPersistence struct {
var (
leveldbChunkSize = flag.Int("leveldbChunkSize", 200, "Maximum number of samples stored under one key.")
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
// These flag values are back of the envelope, though they seem
// sensible. Please re-evaluate based on your own needs.
curationRemarksCacheSize = flag.Int("curationRemarksCacheSize", 5*1024*1024, "The size for the curation remarks cache (bytes).")
fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 25*1024*1024, "The size for the fingerprint to label pair index (bytes).")
highWatermarkCacheSize = flag.Int("highWatermarksByFingerprintSizeBytes", 5*1024*1024, "The size for the metric high watermarks (bytes).")
@ -75,19 +76,15 @@ var (
)
type leveldbOpener func()
type errorCloser interface {
Close() error
}
type closer interface {
Close()
}
// Close closes all the underlying persistence layers. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) Close() {
var persistences = []interface{}{
var persistences = []raw.Database{
l.CurationRemarks,
l.FingerprintToMetrics,
l.LabelNameToFingerprints,
l.LabelSetToFingerprints,
l.LabelPairToFingerprints,
l.MetricHighWatermarks,
l.MetricMembershipIndex,
l.MetricSamples,
@ -97,17 +94,12 @@ func (l *LevelDBMetricPersistence) Close() {
for _, c := range persistences {
closerGroup.Add(1)
go func(c interface{}) {
go func(c raw.Database) {
if c != nil {
switch closer := c.(type) {
case closer:
closer.Close()
case errorCloser:
if err := closer.Close(); err != nil {
if err := c.Close(); err != nil {
glog.Error("Error closing persistence: ", err)
}
}
}
closerGroup.Done()
}(c)
}
@ -115,10 +107,12 @@ func (l *LevelDBMetricPersistence) Close() {
closerGroup.Wait()
}
// NewLevelDBMetricPersistence returns a LevelDBMetricPersistence object ready
// to use.
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) {
workers := utility.NewUncertaintyGroup(7)
emission := new(LevelDBMetricPersistence)
emission := &LevelDBMetricPersistence{}
var subsystemOpeners = []struct {
name string
@ -185,7 +179,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair",
func() {
var err error
emission.LabelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{
emission.LabelPairToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(LevelDBLabelSetFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Pair",
Purpose: "Index",
@ -239,19 +233,20 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
glog.Error("Could not open storage: ", err)
}
return nil, fmt.Errorf("Unable to open metric persistence.")
return nil, fmt.Errorf("unable to open metric persistence")
}
emission.Indexer = &TotalIndexer{
FingerprintToMetric: emission.FingerprintToMetrics,
LabelNameToFingerprint: emission.LabelNameToFingerprints,
LabelPairToFingerprint: emission.LabelSetToFingerprints,
LabelPairToFingerprint: emission.LabelPairToFingerprints,
MetricMembership: emission.MetricMembershipIndex,
}
return emission, nil
}
// AppendSample implements the MetricPersistence interface.
func (l *LevelDBMetricPersistence) AppendSample(sample *clientmodel.Sample) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -317,6 +312,7 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.
return l.MetricHighWatermarks.UpdateBatch(b)
}
// AppendSamples appends the given Samples to the database and indexes them.
func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -345,9 +341,9 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
key := new(SampleKey)
keyDto := new(dto.SampleKey)
value := new(dto.SampleValueSeries)
key := &SampleKey{}
keyDto := &dto.SampleKey{}
value := &dto.SampleValueSeries{}
for fingerprint, group := range fingerprintToSamples {
for {
@ -434,6 +430,8 @@ func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value b
return l.MetricMembershipIndex.Has(m)
}
// HasLabelPair returns true if the given LabelPair is present in the underlying
// LabelPair index.
func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -441,9 +439,11 @@ func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err e
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now())
return l.LabelSetToFingerprints.Has(p)
return l.LabelPairToFingerprints.Has(p)
}
// HasLabelName returns true if the given LabelName is present in the underlying
// LabelName index.
func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -456,6 +456,9 @@ func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value
return
}
// GetFingerprintsForLabelSet returns the Fingerprints for the given LabelSet by
// querying the underlying LabelPairFingerprintIndex for each LabelPair
// contained in LabelSet. It implements the MetricPersistence interface.
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (fps clientmodel.Fingerprints, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -466,7 +469,7 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod
sets := []utility.Set{}
for name, value := range labelSet {
fps, _, err := l.LabelSetToFingerprints.Lookup(&LabelPair{
fps, _, err := l.LabelPairToFingerprints.Lookup(&LabelPair{
Name: name,
Value: value,
})
@ -500,6 +503,9 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod
return fps, nil
}
// GetFingerprintsForLabelName returns the Fingerprints for the given LabelName
// from the underlying LabelNameFingerprintIndex. It implements the
// MetricPersistence interface.
func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientmodel.LabelName) (fps clientmodel.Fingerprints, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -513,6 +519,9 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientm
return fps, err
}
// GetMetricForFingerprint returns the Metric for the given Fingerprint from the
// underlying FingerprintMetricIndex. It implements the MetricPersistence
// interface.
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
@ -526,68 +535,15 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger
return m, nil
}
func (l *LevelDBMetricPersistence) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) Values {
panic("Not implemented")
}
func (l *LevelDBMetricPersistence) GetBoundaryValues(f *clientmodel.Fingerprint, i Interval) Values {
panic("Not implemented")
}
func (l *LevelDBMetricPersistence) GetRangeValues(f *clientmodel.Fingerprint, i Interval) Values {
panic("Not implemented")
}
type MetricKeyDecoder struct{}
func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
unmarshaled := dto.LabelPair{}
err = proto.Unmarshal(in.([]byte), &unmarshaled)
if err != nil {
return
}
out = LabelPair{
Name: clientmodel.LabelName(*unmarshaled.Name),
Value: clientmodel.LabelValue(*unmarshaled.Value),
}
return
}
func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
return
}
type LabelNameFilter struct {
labelName clientmodel.LabelName
}
func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
labelPair, ok := key.(LabelPair)
if ok && labelPair.Name == f.labelName {
return storage.ACCEPT
}
return storage.SKIP
}
type CollectLabelValuesOp struct {
labelValues []clientmodel.LabelValue
}
func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
labelPair := key.(LabelPair)
op.labelValues = append(op.labelValues, clientmodel.LabelValue(labelPair.Value))
return
}
// GetAllValuesForLabel gets all label values that are associated with the
// provided label name.
func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
filter := &LabelNameFilter{
labelName: labelName,
}
labelValuesOp := &CollectLabelValuesOp{}
_, err = l.LabelSetToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp)
_, err = l.LabelPairToFingerprints.ForEach(&MetricKeyDecoder{}, filter, labelValuesOp)
if err != nil {
return
}
@ -604,41 +560,42 @@ func (l *LevelDBMetricPersistence) Prune() {
l.CurationRemarks.Prune()
l.FingerprintToMetrics.Prune()
l.LabelNameToFingerprints.Prune()
l.LabelSetToFingerprints.Prune()
l.LabelPairToFingerprints.Prune()
l.MetricHighWatermarks.Prune()
l.MetricMembershipIndex.Prune()
l.MetricSamples.Prune()
}
// Sizes returns the sum of all sizes of the underlying databases.
func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
size := uint64(0)
if size, _, err = l.CurationRemarks.Size(); err != nil {
if size, err = l.CurationRemarks.Size(); err != nil {
return 0, err
}
total += size
if size, _, err = l.FingerprintToMetrics.Size(); err != nil {
if size, err = l.FingerprintToMetrics.Size(); err != nil {
return 0, err
}
total += size
if size, _, err = l.LabelNameToFingerprints.Size(); err != nil {
if size, err = l.LabelNameToFingerprints.Size(); err != nil {
return 0, err
}
total += size
if size, _, err = l.LabelSetToFingerprints.Size(); err != nil {
if size, err = l.LabelPairToFingerprints.Size(); err != nil {
return 0, err
}
total += size
if size, _, err = l.MetricHighWatermarks.Size(); err != nil {
if size, err = l.MetricHighWatermarks.Size(); err != nil {
return 0, err
}
total += size
if size, _, err = l.MetricMembershipIndex.Size(); err != nil {
if size, err = l.MetricMembershipIndex.Size(); err != nil {
return 0, err
}
total += size
@ -651,20 +608,64 @@ func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
return total, nil
}
// States returns the DatabaseStates of all underlying databases.
func (l *LevelDBMetricPersistence) States() raw.DatabaseStates {
return raw.DatabaseStates{
l.CurationRemarks.State(),
l.FingerprintToMetrics.State(),
l.LabelNameToFingerprints.State(),
l.LabelSetToFingerprints.State(),
l.LabelPairToFingerprints.State(),
l.MetricHighWatermarks.State(),
l.MetricMembershipIndex.State(),
l.MetricSamples.State(),
}
}
// CollectLabelValuesOp implements storage.RecordOperator. It collects the
// encountered LabelValues in a slice.
type CollectLabelValuesOp struct {
labelValues []clientmodel.LabelValue
}
// Operate implements storage.RecordOperator. 'key' is required to be a
// LabelPair. Its Value is appended to a slice of collected LabelValues.
func (op *CollectLabelValuesOp) Operate(key, value interface{}) (err *storage.OperatorError) {
labelPair := key.(LabelPair)
op.labelValues = append(op.labelValues, labelPair.Value)
return
}
// MetricKeyDecoder implements storage.RecordDecoder for LabelPairs.
type MetricKeyDecoder struct{}
// DecodeKey implements storage.RecordDecoder. It requires 'in' to be a
// LabelPair protobuf. 'out' is a metric.LabelPair.
func (d *MetricKeyDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
unmarshaled := dto.LabelPair{}
err = proto.Unmarshal(in.([]byte), &unmarshaled)
if err != nil {
return
}
out = LabelPair{
Name: clientmodel.LabelName(*unmarshaled.Name),
Value: clientmodel.LabelValue(*unmarshaled.Value),
}
return
}
// DecodeValue implements storage.RecordDecoder. It is a no-op and always
// returns (nil, nil).
func (d *MetricKeyDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
return
}
// MetricSamplesDecoder implements storage.RecordDecoder for SampleKeys.
type MetricSamplesDecoder struct{}
// DecodeKey implements storage.RecordDecoder. It requires 'in' to be a
// SampleKey protobuf. 'out' is a metric.SampleKey.
func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) {
key := &dto.SampleKey{}
err := proto.Unmarshal(in.([]byte), key)
@ -678,6 +679,8 @@ func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) {
return sampleKey, nil
}
// DecodeValue implements storage.RecordDecoder. It requires 'in' to be a
// SampleValueSeries protobuf. 'out' is of type metric.Values.
func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) {
values := &dto.SampleValueSeries{}
err := proto.Unmarshal(in.([]byte), values)
@ -688,8 +691,27 @@ func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error)
return NewValuesFromDTO(values), nil
}
// AcceptAllFilter implements storage.RecordFilter and accepts all records.
type AcceptAllFilter struct{}
// Filter implements storage.RecordFilter. It always returns ACCEPT.
func (d *AcceptAllFilter) Filter(_, _ interface{}) storage.FilterResult {
return storage.ACCEPT
return storage.Accept
}
// LabelNameFilter implements storage.RecordFilter and filters records matching
// a LabelName.
type LabelNameFilter struct {
labelName clientmodel.LabelName
}
// Filter implements storage.RecordFilter. 'key' is expected to be a
// LabelPair. The result is ACCEPT if the Name of the LabelPair matches the
// LabelName of this LabelNameFilter.
func (f LabelNameFilter) Filter(key, value interface{}) (filterResult storage.FilterResult) {
labelPair, ok := key.(LabelPair)
if ok && labelPair.Name == f.labelName {
return storage.Accept
}
return storage.Skip
}

View file

@ -22,24 +22,10 @@ import (
"github.com/prometheus/prometheus/utility"
)
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of
// storage per metric without any major reallocations.
// An initialSeriesArenaSize of 4*60 allows for one hour's worth of storage per
// metric without any major reallocations - assuming a sample rate of 1 / 15Hz.
const initialSeriesArenaSize = 4 * 60
// Models a given sample entry stored in the in-memory arena.
type value interface {
// Gets the given value.
get() clientmodel.SampleValue
}
// Models a single sample value. It presumes that there is either no subsequent
// value seen or that any subsequent values are of a different value.
type singletonValue clientmodel.SampleValue
func (v singletonValue) get() clientmodel.SampleValue {
return clientmodel.SampleValue(v)
}
type stream interface {
add(...*SamplePair)
@ -194,9 +180,11 @@ type memorySeriesStorage struct {
labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints
}
// MemorySeriesOptions bundles options used by NewMemorySeriesStorage to create
// a memory series storage.
type MemorySeriesOptions struct {
// If provided, this WatermarkCache will be updated for any samples that are
// appended to the memorySeriesStorage.
// If provided, this WatermarkCache will be updated for any samples that
// are appended to the memorySeriesStorage.
WatermarkCache *watermarkCache
}
@ -485,6 +473,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa
return
}
// NewMemorySeriesStorage returns a memory series storage ready to use.
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[clientmodel.Fingerprint]stream),

View file

@ -22,7 +22,7 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
)
// Encapsulates a primitive query operation.
// op encapsulates a primitive query operation.
type op interface {
// The time at which this operation starts.
StartsAt() clientmodel.Timestamp
@ -30,14 +30,14 @@ type op interface {
ExtractSamples(Values) Values
// Return whether the operator has consumed all data it needs.
Consumed() bool
// Get current operation time or nil if no subsequent work associated with
// this operator remains.
// Get current operation time or nil if no subsequent work associated
// with this operator remains.
CurrentTime() clientmodel.Timestamp
// GreedierThan indicates whether this present operation should take
// precedence over the other operation due to greediness.
//
// A critical assumption is that this operator and the other occur at the
// same time: this.StartsAt().Equal(op.StartsAt()).
// A critical assumption is that this operator and the other occur at
// the same time: this.StartsAt().Equal(op.StartsAt()).
GreedierThan(op) bool
}
@ -48,8 +48,8 @@ func (o ops) Len() int {
return len(o)
}
// startsAtSort implements the sorting protocol and allows operator to be sorted
// in chronological order by when they start.
// startsAtSort implements sort.Interface and allows operator to be sorted in
// chronological order by when they start.
type startsAtSort struct {
ops
}
@ -62,7 +62,8 @@ func (o ops) Swap(i, j int) {
o[i], o[j] = o[j], o[i]
}
// Encapsulates getting values at or adjacent to a specific time.
// getValuesAtTimeOp encapsulates getting values at or adjacent to a specific
// time.
type getValuesAtTimeOp struct {
time clientmodel.Timestamp
consumed bool
@ -112,15 +113,17 @@ func extractValuesAroundTime(t clientmodel.Timestamp, in Values) (out Values) {
out = in[len(in)-1:]
} else {
if in[i].Timestamp.Equal(t) && len(in) > i+1 {
// We hit exactly the current sample time. Very unlikely in practice.
// Return only the current sample.
// We hit exactly the current sample time. Very unlikely
// in practice. Return only the current sample.
out = in[i : i+1]
} else {
if i == 0 {
// We hit before the first sample time. Return only the first sample.
// We hit before the first sample time. Return
// only the first sample.
out = in[0:1]
} else {
// We hit between two samples. Return both surrounding samples.
// We hit between two samples. Return both
// surrounding samples.
out = in[i-1 : i+1]
}
}
@ -136,15 +139,16 @@ func (g getValuesAtTimeOp) Consumed() bool {
return g.consumed
}
// Encapsulates getting values at a given interval over a duration.
// getValuesAtIntervalOp encapsulates getting values at a given interval over a
// duration.
type getValuesAtIntervalOp struct {
from clientmodel.Timestamp
through clientmodel.Timestamp
interval time.Duration
}
func (o *getValuesAtIntervalOp) String() string {
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", o.from, o.interval, o.through)
func (g *getValuesAtIntervalOp) String() string {
return fmt.Sprintf("getValuesAtIntervalOp from %s each %s through %s", g.from, g.interval, g.through)
}
func (g *getValuesAtIntervalOp) StartsAt() clientmodel.Timestamp {
@ -199,14 +203,14 @@ func (g *getValuesAtIntervalOp) GreedierThan(op op) (superior bool) {
return
}
// Encapsulates getting all values in a given range.
// getValuesAlongRangeOp encapsulates getting all values in a given range.
type getValuesAlongRangeOp struct {
from clientmodel.Timestamp
through clientmodel.Timestamp
}
func (o *getValuesAlongRangeOp) String() string {
return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", o.from, o.through)
func (g *getValuesAlongRangeOp) String() string {
return fmt.Sprintf("getValuesAlongRangeOp from %s through %s", g.from, g.through)
}
func (g *getValuesAlongRangeOp) StartsAt() clientmodel.Timestamp {
@ -271,7 +275,8 @@ func (g *getValuesAlongRangeOp) GreedierThan(op op) (superior bool) {
return
}
// Encapsulates getting all values from ranges along intervals.
// getValueRangeAtIntervalOp encapsulates getting all values from ranges along
// intervals.
//
// Works just like getValuesAlongRangeOp, but when from > through, through is
// incremented by interval and from is reset to through-rangeDuration. Returns
@ -284,8 +289,8 @@ type getValueRangeAtIntervalOp struct {
through clientmodel.Timestamp
}
func (o *getValueRangeAtIntervalOp) String() string {
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", o.rangeDuration, o.rangeFrom, o.interval, o.through)
func (g *getValueRangeAtIntervalOp) String() string {
return fmt.Sprintf("getValueRangeAtIntervalOp range %s from %s each %s through %s", g.rangeDuration, g.rangeFrom, g.interval, g.through)
}
func (g *getValueRangeAtIntervalOp) StartsAt() clientmodel.Timestamp {

View file

@ -26,43 +26,51 @@ import (
dto "github.com/prometheus/prometheus/model/generated"
)
// processor models a post-processing agent that performs work given a sample
// Processor models a post-processing agent that performs work given a sample
// corpus.
type Processor interface {
// Name emits the name of this processor's signature encoder. It must be
// fully-qualified in the sense that it could be used via a Protocol Buffer
// registry to extract the descriptor to reassemble this message.
// Name emits the name of this processor's signature encoder. It must
// be fully-qualified in the sense that it could be used via a Protocol
// Buffer registry to extract the descriptor to reassemble this message.
Name() string
// Signature emits a byte signature for this process for the purpose of
// remarking how far along it has been applied to the database.
Signature() []byte
// Apply runs this processor against the sample set. sampleIterator expects
// to be pre-seeked to the initial starting position. The processor will
// run until up until stopAt has been reached. It is imperative that the
// provided stopAt is within the interval of the series frontier.
// Apply runs this processor against the sample set. sampleIterator
// expects to be pre-seeked to the initial starting position. The
// processor will run until up until stopAt has been reached. It is
// imperative that the provided stopAt is within the interval of the
// series frontier.
//
// Upon completion or error, the last time at which the processor finished
// shall be emitted in addition to any errors.
// Upon completion or error, the last time at which the processor
// finished shall be emitted in addition to any errors.
Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error)
// Close reaps all of the underlying system resources associated with
// this processor.
Close()
}
// CompactionProcessor combines sparse values in the database together such
// that at least MinimumGroupSize-sized chunks are grouped together.
// CompactionProcessor combines sparse values in the database together such that
// at least MinimumGroupSize-sized chunks are grouped together. It implements
// the Processor interface.
type CompactionProcessor struct {
maximumMutationPoolBatch int
minimumGroupSize int
// signature is the byte representation of the CompactionProcessor's settings,
// used for purely memoization purposes across an instance.
// signature is the byte representation of the CompactionProcessor's
// settings, used for purely memoization purposes across an instance.
signature []byte
dtoSampleKeys *dtoSampleKeyList
sampleKeys *sampleKeyList
}
// Name implements the Processor interface. It returns
// "io.prometheus.CompactionProcessorDefinition".
func (p *CompactionProcessor) Name() string {
return "io.prometheus.CompactionProcessorDefinition"
}
// Signature implements the Processor interface.
func (p *CompactionProcessor) Signature() []byte {
if len(p.signature) == 0 {
out, err := proto.Marshal(&dto.CompactionProcessorDefinition{
@ -82,8 +90,9 @@ func (p *CompactionProcessor) String() string {
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize)
}
// Apply implements the Processor interface.
func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) {
var pendingBatch raw.Batch = nil
var pendingBatch raw.Batch
defer func() {
if pendingBatch != nil {
@ -125,7 +134,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
// block would prevent us from going into unsafe territory.
case len(unactedSamples) == 0:
if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
return lastCurated, fmt.Errorf("illegal condition: invalid iterator on continuation")
}
keyDropped = false
@ -163,7 +172,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize:
if !keyDropped {
k := new(dto.SampleKey)
k := &dto.SampleKey{}
sampleKey.Dump(k)
pendingBatch.Drop(k)
@ -176,10 +185,10 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
// If the number of pending writes equals the target group size
case len(pendingSamples) == p.minimumGroupSize:
k := new(dto.SampleKey)
k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey.Dump(k)
b := new(dto.SampleValueSeries)
b := &dto.SampleValueSeries{}
pendingSamples.dump(b)
pendingBatch.Put(k, b)
@ -205,7 +214,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
case len(pendingSamples)+len(unactedSamples) >= p.minimumGroupSize:
if !keyDropped {
k := new(dto.SampleKey)
k := &dto.SampleKey{}
sampleKey.Dump(k)
pendingBatch.Drop(k)
keyDropped = true
@ -220,16 +229,16 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
}
pendingMutations++
default:
err = fmt.Errorf("Unhandled processing case.")
err = fmt.Errorf("unhandled processing case")
}
}
if len(unactedSamples) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(pendingSamples, unactedSamples...)
k := new(dto.SampleKey)
k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey.Dump(k)
b := new(dto.SampleValueSeries)
b := &dto.SampleValueSeries{}
pendingSamples.dump(b)
pendingBatch.Put(k, b)
pendingSamples = Values{}
@ -249,11 +258,14 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
return
}
// Close implements the Processor interface.
func (p *CompactionProcessor) Close() {
p.dtoSampleKeys.Close()
p.sampleKeys.Close()
}
// CompactionProcessorOptions are used for connstruction of a
// CompactionProcessor.
type CompactionProcessorOptions struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
@ -266,6 +278,7 @@ type CompactionProcessorOptions struct {
MinimumGroupSize int
}
// NewCompactionProcessor returns a CompactionProcessor ready to use.
func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor {
return &CompactionProcessor{
maximumMutationPoolBatch: o.MaximumMutationPoolBatch,
@ -276,7 +289,8 @@ func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor
}
}
// DeletionProcessor deletes sample blocks older than a defined value.
// DeletionProcessor deletes sample blocks older than a defined value. It
// implements the Processor interface.
type DeletionProcessor struct {
maximumMutationPoolBatch int
// signature is the byte representation of the DeletionProcessor's settings,
@ -287,10 +301,13 @@ type DeletionProcessor struct {
sampleKeys *sampleKeyList
}
// Name implements the Processor interface. It returns
// "io.prometheus.DeletionProcessorDefinition".
func (p *DeletionProcessor) Name() string {
return "io.prometheus.DeletionProcessorDefinition"
}
// Signature implements the Processor interface.
func (p *DeletionProcessor) Signature() []byte {
if len(p.signature) == 0 {
out, err := proto.Marshal(&dto.DeletionProcessorDefinition{})
@ -309,8 +326,9 @@ func (p *DeletionProcessor) String() string {
return "deletionProcessor"
}
// Apply implements the Processor interface.
func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt clientmodel.Timestamp, fingerprint *clientmodel.Fingerprint) (lastCurated clientmodel.Timestamp, err error) {
var pendingBatch raw.Batch = nil
var pendingBatch raw.Batch
defer func() {
if pendingBatch != nil {
@ -342,12 +360,13 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
case pendingBatch == nil:
pendingBatch = leveldb.NewBatch()
// If there are no sample values to extract from the datastore, let's
// continue extracting more values to use. We know that the time.Before()
// block would prevent us from going into unsafe territory.
// If there are no sample values to extract from the datastore,
// let's continue extracting more values to use. We know that
// the time.Before() block would prevent us from going into
// unsafe territory.
case len(sampleValues) == 0:
if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
return lastCurated, fmt.Errorf("illegal condition: invalid iterator on continuation")
}
if err = sampleIterator.Key(sampleKeyDto); err != nil {
@ -360,9 +379,9 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
return
}
// If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
// If the number of pending mutations exceeds the allowed batch
// amount, commit to disk and delete the batch. A new one will
// be recreated if necessary.
case pendingMutations >= p.maximumMutationPoolBatch:
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
@ -403,7 +422,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
}
default:
err = fmt.Errorf("Unhandled processing case.")
err = fmt.Errorf("unhandled processing case")
}
}
@ -419,11 +438,13 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
return
}
// Close implements the Processor interface.
func (p *DeletionProcessor) Close() {
p.dtoSampleKeys.Close()
p.sampleKeys.Close()
}
// DeletionProcessorOptions are used for connstruction of a DeletionProcessor.
type DeletionProcessorOptions struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
@ -431,6 +452,7 @@ type DeletionProcessorOptions struct {
MaximumMutationPoolBatch int
}
// NewDeletionProcessor returns a DeletionProcessor ready to use.
func NewDeletionProcessor(o *DeletionProcessorOptions) *DeletionProcessor {
return &DeletionProcessor{
maximumMutationPoolBatch: o.MaximumMutationPoolBatch,

View file

@ -112,7 +112,7 @@ func (s sampleGroup) Get() (key, value proto.Message) {
return k, v
}
type noopUpdater bool
type noopUpdater struct{}
func (noopUpdater) UpdateCurationState(*CurationState) {}
@ -876,7 +876,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
}
defer samples.Close()
updates := new(noopUpdater)
updates := &noopUpdater{}
stop := make(chan bool)
defer close(stop)
@ -891,7 +891,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
t.Fatal(err)
}
iterator, err := curatorStates.p.NewIterator(true)
iterator, err := curatorStates.LevelDBPersistence.NewIterator(true)
if err != nil {
t.Fatal(err)
}
@ -909,12 +909,12 @@ func TestCuratorCompactionProcessor(t *testing.T) {
}
}
curationKeyDto := new(dto.CurationKey)
curationKeyDto := &dto.CurationKey{}
err = iterator.Key(curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
actualKey := new(curationKey)
actualKey := &curationKey{}
actualKey.load(curationKeyDto)
actualValue, present, err := curatorStates.Get(actualKey)
@ -988,7 +988,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
for k, actualValue := range sampleValues {
if expected.values[k].Value != actualValue.Value {
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value)
t.Fatalf("%d.%d.%d. expected %v, got %v", i, j, k, expected.values[k].Value, actualValue.Value)
}
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)
@ -1405,7 +1405,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
}
defer samples.Close()
updates := new(noopUpdater)
updates := &noopUpdater{}
stop := make(chan bool)
defer close(stop)
@ -1420,7 +1420,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
t.Fatal(err)
}
iterator, err := curatorStates.p.NewIterator(true)
iterator, err := curatorStates.LevelDBPersistence.NewIterator(true)
if err != nil {
t.Fatal(err)
}
@ -1438,12 +1438,12 @@ func TestCuratorDeletionProcessor(t *testing.T) {
}
}
curationKeyDto := new(dto.CurationKey)
curationKeyDto := &dto.CurationKey{}
if err := iterator.Key(curationKeyDto); err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
actualKey := new(curationKey)
actualKey := &curationKey{}
actualKey.load(curationKeyDto)
signature := expected.processor.Signature()
@ -1518,7 +1518,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
for k, actualValue := range sampleValues {
if expected.values[k].Value != actualValue.Value {
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value)
t.Fatalf("%d.%d.%d. expected %v, got %v", i, j, k, expected.values[k].Value, actualValue.Value)
}
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)

View file

@ -22,7 +22,7 @@ import (
"github.com/prometheus/prometheus/utility/test"
)
func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) {
func GetValueAtTimeTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), t test.Tester) {
type value struct {
year int
month time.Month
@ -355,7 +355,7 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer
}
}
func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer), onlyBoundaries bool, t test.Tester) {
func GetRangeValuesTests(persistenceMaker func() (ViewableMetricPersistence, test.Closer), onlyBoundaries bool, t test.Tester) {
type value struct {
year int
month time.Month
@ -854,7 +854,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
}
if actualValues == nil && len(expectedValues) != 0 {
t.Fatalf("%d.%d(%s). Expected %s but got: %s\n", i, j, behavior.name, expectedValues, actualValues)
t.Fatalf("%d.%d(%s). Expected %v but got: %v\n", i, j, behavior.name, expectedValues, actualValues)
}
if expectedValues == nil {
@ -899,7 +899,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
// Test Definitions Follow
func testMemoryGetValueAtTime(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
@ -927,7 +927,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) {
}
func testMemoryGetRangeValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
@ -935,7 +935,7 @@ func testMemoryGetRangeValues(t test.Tester) {
}
func testMemoryGetBoundaryValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
persistenceMaker := func() (ViewableMetricPersistence, test.Closer) {
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}

View file

@ -25,15 +25,19 @@ import (
dto "github.com/prometheus/prometheus/model/generated"
)
// MarshalJSON implements json.Marshaler.
func (s SamplePair) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("{\"Value\": \"%f\", \"Timestamp\": %d}", s.Value, s.Timestamp)), nil
}
// SamplePair pairs a SampleValue with a Timestamp.
type SamplePair struct {
Value clientmodel.SampleValue
Timestamp clientmodel.Timestamp
}
// Equal returns true if this SamplePair and o have equal Values and equal
// Timestamps.
func (s *SamplePair) Equal(o *SamplePair) bool {
if s == o {
return true
@ -54,20 +58,27 @@ func (s *SamplePair) String() string {
return fmt.Sprintf("SamplePair at %s of %s", s.Timestamp, s.Value)
}
// Values is a sortable slice of SamplePair pointers (as in: it implements
// sort.Interface). Sorting happens by Timestamp.
type Values []*SamplePair
// Len implements sort.Interface.
func (v Values) Len() int {
return len(v)
}
// Less implements sort.Interface.
func (v Values) Less(i, j int) bool {
return v[i].Timestamp.Before(v[j].Timestamp)
}
// Swap implements sort.Interface.
func (v Values) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
// Equal returns true if these Values are of the same length as o, and each
// value is equal to the corresponding value in o (i.e. at the same index).
func (v Values) Equal(o Values) bool {
if len(v) != len(o) {
return false
@ -132,6 +143,7 @@ func (v Values) dump(d *dto.SampleValueSeries) {
}
}
// ToSampleKey returns the SampleKey for these Values.
func (v Values) ToSampleKey(f *clientmodel.Fingerprint) *SampleKey {
return &SampleKey{
Fingerprint: f,
@ -156,9 +168,10 @@ func (v Values) String() string {
return buffer.String()
}
// NewValuesFromDTO deserializes Values from a DTO.
func NewValuesFromDTO(d *dto.SampleValueSeries) Values {
// BUG(matt): Incogruent from the other load/dump API types, but much more
// performant.
// BUG(matt): Incogruent from the other load/dump API types, but much
// more performant.
v := make(Values, 0, len(d.Value))
for _, value := range d.Value {
@ -171,11 +184,13 @@ func NewValuesFromDTO(d *dto.SampleValueSeries) Values {
return v
}
// SampleSet is Values with a Metric attached.
type SampleSet struct {
Metric clientmodel.Metric
Values Values
}
// Interval describes the inclusive interval between two Timestamps.
type Interval struct {
OldestInclusive clientmodel.Timestamp
NewestInclusive clientmodel.Timestamp

View file

@ -49,6 +49,8 @@ func (s *SampleKey) Constrain(first, last *SampleKey) bool {
}
}
// Equal returns true if this SampleKey and o have equal fingerprints,
// timestamps, and sample counts.
func (s *SampleKey) Equal(o *SampleKey) bool {
if s == o {
return true
@ -81,6 +83,11 @@ func (s *SampleKey) MayContain(t clientmodel.Timestamp) bool {
}
}
// Before returns true if the Fingerprint of this SampleKey is less than fp and
// false if it is greater. If both fingerprints are equal, the FirstTimestamp of
// this SampleKey is checked in the same way against t. If the timestamps are
// eqal, the LastTimestamp of this SampleKey is checked against t (and false is
// returned if they are equal again).
func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp) bool {
if s.Fingerprint.Less(fp) {
return true
@ -96,7 +103,7 @@ func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t clientmodel.Timestamp)
return s.LastTimestamp.Before(t)
}
// ToDTO converts this SampleKey into a DTO for use in serialization purposes.
// Dump converts this SampleKey into a DTO for use in serialization purposes.
func (s *SampleKey) Dump(d *dto.SampleKey) {
d.Reset()
fp := &dto.Fingerprint{}
@ -112,6 +119,7 @@ func (s *SampleKey) String() string {
return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount)
}
// Load deserializes this SampleKey from a DTO.
func (s *SampleKey) Load(d *dto.SampleKey) {
f := &clientmodel.Fingerprint{}
loadFingerprint(f, d.GetFingerprint())

View file

@ -422,8 +422,8 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
for i := 0; i < numberOfRangeScans; i++ {
timestamps := metricTimestamps[metricIndex]
var first int64 = 0
var second int64 = 0
var first int64
var second int64
for {
firstCandidate := random.Int63n(int64(len(timestamps)))
@ -472,6 +472,11 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
fp := &clientmodel.Fingerprint{}
fp.LoadFromMetric(metric)
switch persistence := p.(type) {
case View:
samples = persistence.GetRangeValues(fp, interval)
if len(samples) < 2 {
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
}
case *LevelDBMetricPersistence:
var err error
samples, err = levelDBGetRangeValues(persistence, fp, interval)
@ -482,10 +487,7 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
}
default:
samples = p.GetRangeValues(fp, interval)
if len(samples) < 2 {
t.Fatalf("expected sample count greater than %d, got %d", 2, len(samples))
}
t.Error("Unexpected type of MetricPersistence.")
}
}
}

View file

@ -114,10 +114,11 @@ const (
const watermarkCacheLimit = 1024 * 1024
// NewTieredStorage returns a TieredStorage object ready to use.
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval time.Duration, memoryTTL time.Duration, rootDirectory string) (*TieredStorage, error) {
if isDir, _ := utility.IsDir(rootDirectory); !isDir {
if err := os.MkdirAll(rootDirectory, 0755); err != nil {
return nil, fmt.Errorf("Could not find or create metrics directory %s: %s", rootDirectory, err)
return nil, fmt.Errorf("could not find or create metrics directory %s: %s", rootDirectory, err)
}
}
@ -160,12 +161,12 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
return s, nil
}
// Enqueues Samples for storage.
// AppendSamples enqueues Samples for storage.
func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
return fmt.Errorf("Storage is not serving.")
return fmt.Errorf("storage is not serving")
}
t.memoryArena.AppendSamples(samples)
@ -174,7 +175,7 @@ func (t *TieredStorage) AppendSamples(samples clientmodel.Samples) (err error) {
return
}
// Stops the storage subsystem, flushing all pending operations.
// Drain stops the storage subsystem, flushing all pending operations.
func (t *TieredStorage) Drain(drained chan<- bool) {
t.mu.Lock()
defer t.mu.Unlock()
@ -193,20 +194,22 @@ func (t *TieredStorage) drain(drained chan<- bool) {
t.draining <- (drained)
}
// Materializes a View according to a ViewRequestBuilder, subject to a timeout.
// MakeView materializes a View according to a ViewRequestBuilder, subject to a
// timeout.
func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration, queryStats *stats.TimerGroup) (View, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if t.state != tieredStorageServing {
return nil, fmt.Errorf("Storage is not serving")
return nil, fmt.Errorf("storage is not serving")
}
// The result channel needs a one-element buffer in case we have timed out in
// MakeView, but the view rendering still completes afterwards and writes to
// the channel.
// The result channel needs a one-element buffer in case we have timed
// out in MakeView, but the view rendering still completes afterwards
// and writes to the channel.
result := make(chan View, 1)
// The abort channel needs a one-element buffer in case the view rendering
// has already exited and doesn't consume from the channel anymore.
// The abort channel needs a one-element buffer in case the view
// rendering has already exited and doesn't consume from the channel
// anymore.
abortChan := make(chan bool, 1)
errChan := make(chan error)
queryStats.GetTimer(stats.ViewQueueTime).Start()
@ -225,11 +228,11 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
return nil, err
case <-time.After(deadline):
abortChan <- true
return nil, fmt.Errorf("MakeView timed out after %s.", deadline)
return nil, fmt.Errorf("fetching query data timed out after %s", deadline)
}
}
// Starts serving requests.
// Serve starts serving requests.
func (t *TieredStorage) Serve(started chan<- bool) {
t.mu.Lock()
if t.state != tieredStorageStarting {
@ -284,6 +287,7 @@ func (t *TieredStorage) reportQueues() {
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue)))
}
// Flush flushes all samples to disk.
func (t *TieredStorage) Flush() {
t.flushSema <- true
t.flushMemory(0)
@ -311,6 +315,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) {
glog.Info("Done flushing.")
}
// Close stops serving, flushes all pending operations, and frees all resources.
func (t *TieredStorage) Close() {
t.mu.Lock()
defer t.mu.Unlock()
@ -329,8 +334,8 @@ func (t *TieredStorage) close() {
t.memoryArena.Close()
t.DiskStorage.Close()
// BUG(matt): There is a probability that pending items may hang here and not
// get flushed.
// BUG(matt): There is a probability that pending items may hang here
// and not get flushed.
close(t.appendToDiskQueue)
close(t.ViewQueue)
t.wmCache.Clear()
@ -639,7 +644,8 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
panic("illegal state: violated sort invariant")
}
// Get all label values that are associated with the provided label name.
// GetAllValuesForLabel gets all label values that are associated with the
// provided label name.
func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (clientmodel.LabelValues, error) {
t.mu.RLock()
defer t.mu.RUnlock()
@ -669,8 +675,8 @@ func (t *TieredStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (c
return values, nil
}
// Get all of the metric fingerprints that are associated with the provided
// label set.
// GetFingerprintsForLabelSet gets all of the metric fingerprints that are
// associated with the provided label set.
func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet) (clientmodel.Fingerprints, error) {
t.mu.RLock()
defer t.mu.RUnlock()
@ -700,7 +706,8 @@ func (t *TieredStorage) GetFingerprintsForLabelSet(labelSet clientmodel.LabelSet
return fingerprints, nil
}
// Get the metric associated with the provided fingerprint.
// GetMetricForFingerprint gets the metric associated with the provided
// fingerprint.
func (t *TieredStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) {
t.mu.RLock()
defer t.mu.RUnlock()

View file

@ -27,8 +27,9 @@ var (
lastSupertime = []byte{127, 255, 255, 255, 255, 255, 255, 255}
)
// Represents the summation of all datastore queries that shall be performed to
// extract values. Each operation mutates the state of the builder.
// ViewRequestBuilder represents the summation of all datastore queries that
// shall be performed to extract values. Each operation mutates the state of
// the builder.
type ViewRequestBuilder interface {
GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp)
GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration)
@ -36,12 +37,13 @@ type ViewRequestBuilder interface {
ScanJobs() scanJobs
}
// Contains the various unoptimized requests for data.
// viewRequestBuilder contains the various unoptimized requests for data.
type viewRequestBuilder struct {
operations map[clientmodel.Fingerprint]ops
}
// Furnishes a ViewRequestBuilder for remarking what types of queries to perform.
// NewViewRequestBuilder furnishes a ViewRequestBuilder for remarking what types
// of queries to perform.
func NewViewRequestBuilder() *viewRequestBuilder {
return &viewRequestBuilder{
operations: make(map[clientmodel.Fingerprint]ops),
@ -50,8 +52,8 @@ func NewViewRequestBuilder() *viewRequestBuilder {
var getValuesAtTimes = newValueAtTimeList(10 * 1024)
// Gets for the given Fingerprint either the value at that time if there is an
// match or the one or two values adjacent thereto.
// GetMetricAtTime gets for the given Fingerprint either the value at that time
// if there is an match or the one or two values adjacent thereto.
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time clientmodel.Timestamp) {
ops := v.operations[*fingerprint]
op, _ := getValuesAtTimes.Get()
@ -62,9 +64,9 @@ func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprin
var getValuesAtIntervals = newValueAtIntervalList(10 * 1024)
// Gets for the given Fingerprint either the value at that interval from From
// through Through if there is an match or the one or two values adjacent
// for each point.
// GetMetricAtInterval gets for the given Fingerprint either the value at that
// interval from From through Through if there is an match or the one or two
// values adjacent for each point.
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) {
ops := v.operations[*fingerprint]
op, _ := getValuesAtIntervals.Get()
@ -77,8 +79,8 @@ func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Finger
var getValuesAlongRanges = newValueAlongRangeList(10 * 1024)
// Gets for the given Fingerprint the values that occur inclusively from From
// through Through.
// GetMetricRange gets for the given Fingerprint the values that occur
// inclusively from From through Through.
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through clientmodel.Timestamp) {
ops := v.operations[*fingerprint]
op, _ := getValuesAlongRanges.Get()
@ -90,7 +92,8 @@ func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint
var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024)
// Gets value ranges at intervals for the given Fingerprint:
// GetMetricRangeAtInterval gets value ranges at intervals for the given
// Fingerprint:
//
// |----| |----| |----| |----|
// ^ ^ ^ ^ ^ ^
@ -108,7 +111,7 @@ func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.F
v.operations[*fingerprint] = ops
}
// Emits the optimized scans that will occur in the data store. This
// ScanJobs emits the optimized scans that will occur in the data store. This
// effectively resets the ViewRequestBuilder back to a pristine state.
func (v *viewRequestBuilder) ScanJobs() (j scanJobs) {
for fingerprint, operations := range v.operations {

View file

@ -17,8 +17,6 @@ import (
"code.google.com/p/goprotobuf/proto"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
@ -40,27 +38,32 @@ func (w *watermarks) dump(d *dto.MetricHighWatermark) {
d.Timestamp = proto.Int64(w.High.Unix())
}
// A FingerprintHighWatermarkMapping is used for batch updates of many high
// watermarks in a database.
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]clientmodel.Timestamp
// HighWatermarker models a high-watermark database.
type HighWatermarker interface {
raw.Database
raw.ForEacher
raw.Pruner
UpdateBatch(FingerprintHighWatermarkMapping) error
Get(*clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error)
State() *raw.DatabaseState
Size() (uint64, bool, error)
}
// LevelDBHighWatermarker is an implementation of HighWatermarker backed by
// leveldb.
type LevelDBHighWatermarker struct {
p *leveldb.LevelDBPersistence
*leveldb.LevelDBPersistence
}
// Get implements HighWatermarker.
func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel.Timestamp, ok bool, err error) {
k := new(dto.Fingerprint)
k := &dto.Fingerprint{}
dumpFingerprint(k, f)
v := new(dto.MetricHighWatermark)
ok, err = w.p.Get(k, v)
v := &dto.MetricHighWatermark{}
ok, err = w.LevelDBPersistence.Get(k, v)
if err != nil {
return t, ok, err
}
@ -71,6 +74,7 @@ func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t clientmodel.
return t, true, nil
}
// UpdateBatch implements HighWatermarker.
func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -80,9 +84,9 @@ func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
if err != nil {
return err
}
k := new(dto.Fingerprint)
k := &dto.Fingerprint{}
dumpFingerprint(k, &fp)
v := new(dto.MetricHighWatermark)
v := &dto.MetricHighWatermark{}
if !present {
v.Timestamp = proto.Int64(t.Unix())
batch.Put(k, v)
@ -97,36 +101,15 @@ func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
}
}
return w.p.Commit(batch)
}
func (w *LevelDBHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return w.p.ForEach(d, f, o)
}
func (w *LevelDBHighWatermarker) Prune() (bool, error) {
w.p.Prune()
return false, nil
}
func (w *LevelDBHighWatermarker) Close() {
w.p.Close()
}
func (w *LevelDBHighWatermarker) State() *raw.DatabaseState {
return w.p.State()
}
func (w *LevelDBHighWatermarker) Size() (uint64, bool, error) {
s, err := w.p.Size()
return s, true, err
return w.LevelDBPersistence.Commit(batch)
}
// LevelDBHighWatermarkerOptions just wraps leveldb.LevelDBOptions.
type LevelDBHighWatermarkerOptions struct {
leveldb.LevelDBOptions
}
// NewLevelDBHighWatermarker returns a LevelDBHighWatermarker ready to use.
func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
@ -134,52 +117,37 @@ func NewLevelDBHighWatermarker(o LevelDBHighWatermarkerOptions) (*LevelDBHighWat
}
return &LevelDBHighWatermarker{
p: s,
LevelDBPersistence: s,
}, nil
}
// CurationRemarker models a curation remarker database.
type CurationRemarker interface {
raw.Database
raw.Pruner
Update(*curationKey, clientmodel.Timestamp) error
Get(*curationKey) (t clientmodel.Timestamp, ok bool, err error)
State() *raw.DatabaseState
Size() (uint64, bool, error)
}
// LevelDBCurationRemarker is an implementation of CurationRemarker backed by
// leveldb.
type LevelDBCurationRemarker struct {
p *leveldb.LevelDBPersistence
*leveldb.LevelDBPersistence
}
// LevelDBCurationRemarkerOptions just wraps leveldb.LevelDBOptions.
type LevelDBCurationRemarkerOptions struct {
leveldb.LevelDBOptions
}
func (w *LevelDBCurationRemarker) State() *raw.DatabaseState {
return w.p.State()
}
func (w *LevelDBCurationRemarker) Size() (uint64, bool, error) {
s, err := w.p.Size()
return s, true, err
}
func (w *LevelDBCurationRemarker) Close() {
w.p.Close()
}
func (w *LevelDBCurationRemarker) Prune() (bool, error) {
w.p.Prune()
return false, nil
}
// Get implements CurationRemarker.
func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp, ok bool, err error) {
k := new(dto.CurationKey)
k := &dto.CurationKey{}
c.dump(k)
v := new(dto.CurationValue)
v := &dto.CurationValue{}
ok, err = w.p.Get(k, v)
ok, err = w.LevelDBPersistence.Get(k, v)
if err != nil || !ok {
return clientmodel.TimestampFromUnix(0), ok, err
}
@ -187,15 +155,17 @@ func (w *LevelDBCurationRemarker) Get(c *curationKey) (t clientmodel.Timestamp,
return clientmodel.TimestampFromUnix(v.GetLastCompletionTimestamp()), true, nil
}
// Update implements CurationRemarker.
func (w *LevelDBCurationRemarker) Update(pair *curationKey, t clientmodel.Timestamp) error {
k := new(dto.CurationKey)
k := &dto.CurationKey{}
pair.dump(k)
return w.p.Put(k, &dto.CurationValue{
return w.LevelDBPersistence.Put(k, &dto.CurationValue{
LastCompletionTimestamp: proto.Int64(t.Unix()),
})
}
// NewLevelDBCurationRemarker returns a LevelDBCurationRemarker ready to use.
func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) {
s, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
@ -203,7 +173,7 @@ func NewLevelDBCurationRemarker(o LevelDBCurationRemarkerOptions) (*LevelDBCurat
}
return &LevelDBCurationRemarker{
p: s,
LevelDBPersistence: s,
}, nil
}

View file

@ -1,23 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package index
import "code.google.com/p/goprotobuf/proto"
type MembershipIndex interface {
Has(key proto.Message) (bool, error)
Put(key proto.Message) error
Drop(key proto.Message) error
Close()
}

View file

@ -1,23 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"github.com/prometheus/prometheus/storage/raw/index"
"testing"
)
func TestInterfaceAdherence(t *testing.T) {
var _ index.MembershipIndex = &LevelDBMembershipIndex{}
}

View file

@ -1,80 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
dto "github.com/prometheus/prometheus/model/generated"
)
var existenceValue = new(dto.MembershipIndexValue)
type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence
}
func (l *LevelDBMembershipIndex) Close() {
l.persistence.Close()
}
func (l *LevelDBMembershipIndex) Has(k proto.Message) (bool, error) {
return l.persistence.Has(k)
}
func (l *LevelDBMembershipIndex) Drop(k proto.Message) error {
return l.persistence.Drop(k)
}
func (l *LevelDBMembershipIndex) Put(k proto.Message) error {
return l.persistence.Put(k, existenceValue)
}
type LevelDBIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBMembershipIndex(o LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) {
leveldbPersistence, err := leveldb.NewLevelDBPersistence(o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LevelDBMembershipIndex{
persistence: leveldbPersistence,
}, nil
}
func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error {
return l.persistence.Commit(batch)
}
// CompactKeyspace compacts the entire database's keyspace.
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
func (l *LevelDBMembershipIndex) Prune() {
l.persistence.Prune()
}
func (l *LevelDBMembershipIndex) Size() (uint64, error) {
return l.persistence.Size()
}
func (l *LevelDBMembershipIndex) State() *raw.DatabaseState {
return l.persistence.State()
}

View file

@ -19,9 +19,25 @@ import (
"github.com/prometheus/prometheus/storage"
)
// Database provides a few very basic methods to manage a database and inquire
// its state.
type Database interface {
// Close reaps all of the underlying system resources associated with
// this database. For databases that don't need that kind of clean-up,
// it is implemented as a no-op (so that clients don't need to reason
// and always call Close 'just in case').
Close() error
// State reports the state of the database as a DatabaseState object.
State() *DatabaseState
// Size returns the total size of the database in bytes. The number may
// be an approximation, depending on the underlying database type.
Size() (uint64, error)
}
// ForEacher is implemented by databases that can be iterated through.
type ForEacher interface {
// ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met:
// ForEach is responsible for iterating through all records in the
// database until one of the following conditions are met:
//
// 1.) A system anomaly in the database scan.
// 2.) The last record in the database is reached.
@ -31,18 +47,21 @@ type ForEacher interface {
ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
}
// Pruner is implemented by a database that can be pruned in some way.
type Pruner interface {
Prune()
}
// Persistence models a key-value store for bytes that supports various
// additional operations.
type Persistence interface {
Database
ForEacher
// Close reaps all of the underlying system resources associated with this
// persistence.
Close() error
// Has informs the user whether a given key exists in the database.
Has(key proto.Message) (bool, error)
// Get retrieves the key from the database if it exists or returns nil if
// it is absent.
// Get populates 'value' with the value of 'key', if present, in which
// case 'present' is returned as true.
Get(key, value proto.Message) (present bool, err error)
// Drop removes the key from the database.
Drop(key proto.Message) error
@ -56,15 +75,11 @@ type Persistence interface {
// en masse. The interface implies no protocol around the atomicity of
// effectuation.
type Batch interface {
// Close reaps all of the underlying system resources associated with this
// batch mutation.
// Close reaps all of the underlying system resources associated with
// this batch mutation.
Close()
// Put follows the same protocol as Persistence.Put.
Put(key, value proto.Message)
// Drop follows the same protocol as Persistence.Drop.
Drop(key proto.Message)
}
type Pruner interface {
Prune() (noop bool, err error)
}

View file

@ -26,6 +26,7 @@ type batch struct {
puts uint32
}
// NewBatch returns a fully allocated batch object.
func NewBatch() *batch {
return &batch{
batch: levigo.NewWriteBatch(),

View file

@ -20,5 +20,5 @@ import (
)
func TestInterfaceAdherence(t *testing.T) {
var _ raw.Persistence = new(LevelDBPersistence)
var _ raw.Persistence = &LevelDBPersistence{}
}

View file

@ -20,6 +20,7 @@ import (
// TODO: Evaluate whether to use coding.Encoder for the key and values instead
// raw bytes for consistency reasons.
// Iterator provides method to iterate through a leveldb.
type Iterator interface {
Error() error
Valid() bool

View file

@ -24,7 +24,8 @@ import (
"github.com/prometheus/prometheus/storage/raw"
)
// LevelDBPersistence is a disk-backed sorted key-value store.
// LevelDBPersistence is a disk-backed sorted key-value store. It implements the
// interfaces raw.Database, raw.ForEacher, raw.Pruner, raw.Persistence.
type LevelDBPersistence struct {
path string
name string
@ -43,23 +44,24 @@ type LevelDBPersistence struct {
type levigoIterator struct {
// iterator is the receiver of most proxied operation calls.
iterator *levigo.Iterator
// readOptions is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the
// end of this iterator's life.
// readOptions is only set if the iterator is a snapshot of an
// underlying database. This signals that it needs to be explicitly
// reaped upon the end of this iterator's life.
readOptions *levigo.ReadOptions
// snapshot is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the
// end of this this iterator's life.
// database. This signals that it needs to be explicitly reaped upon
// the end of this this iterator's life.
snapshot *levigo.Snapshot
// storage is only set if the iterator is a snapshot of an underlying
// database. This signals that it needs to be explicitly reaped upon the
// end of this this iterator's life. The snapshot must be freed in the
// context of an actual database.
// database. This signals that it needs to be explicitly reaped upon
// the end of this this iterator's life. The snapshot must be freed in
// the context of an actual database.
storage *levigo.DB
// closed indicates whether the iterator has been closed before.
closed bool
// valid indicates whether the iterator may be used. If a LevelDB iterator
// ever becomes invalid, it must be disposed of and cannot be reused.
// valid indicates whether the iterator may be used. If a LevelDB
// iterator ever becomes invalid, it must be disposed of and cannot be
// reused.
valid bool
// creationTime provides the time at which the iterator was made.
creationTime time.Time
@ -191,13 +193,16 @@ func (i *levigoIterator) Valid() bool {
return i.valid
}
// Compression defines the compression mode.
type Compression uint
// Possible compression modes.
const (
Snappy Compression = iota
Uncompressed
)
// LevelDBOptions bundles options needed to create a LevelDBPersistence object.
type LevelDBOptions struct {
Path string
Name string
@ -212,6 +217,8 @@ type LevelDBOptions struct {
Compression Compression
}
// NewLevelDBPersistence returns an initialized LevelDBPersistence object,
// created with the given options.
func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) {
options := levigo.NewOptions()
options.SetCreateIfMissing(true)
@ -257,9 +264,10 @@ func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) {
}, nil
}
// Close implements raw.Persistence (and raw.Database).
func (l *LevelDBPersistence) Close() error {
// These are deferred to take advantage of forced closing in case of stack
// unwinding due to anomalies.
// These are deferred to take advantage of forced closing in case of
// stack unwinding due to anomalies.
defer func() {
if l.filterPolicy != nil {
l.filterPolicy.Close()
@ -299,6 +307,7 @@ func (l *LevelDBPersistence) Close() error {
return nil
}
// Get implements raw.Persistence.
func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
buf, _ := buffers.Get()
defer buffers.Give(buf)
@ -328,10 +337,12 @@ func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
return true, nil
}
// Has implements raw.Persistence.
func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) {
return l.Get(k, nil)
}
// Drop implements raw.Persistence.
func (l *LevelDBPersistence) Drop(k proto.Message) error {
buf, _ := buffers.Get()
defer buffers.Give(buf)
@ -343,27 +354,29 @@ func (l *LevelDBPersistence) Drop(k proto.Message) error {
return l.storage.Delete(l.writeOptions, buf.Bytes())
}
func (l *LevelDBPersistence) Put(key, value proto.Message) error {
// Put implements raw.Persistence.
func (l *LevelDBPersistence) Put(k, v proto.Message) error {
keyBuf, _ := buffers.Get()
defer buffers.Give(keyBuf)
if err := keyBuf.Marshal(key); err != nil {
if err := keyBuf.Marshal(k); err != nil {
panic(err)
}
valBuf, _ := buffers.Get()
defer buffers.Give(valBuf)
if err := valBuf.Marshal(value); err != nil {
if err := valBuf.Marshal(v); err != nil {
panic(err)
}
return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes())
}
// Commit implements raw.Persistence.
func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
// XXX: This is a wart to clean up later. Ideally, after doing extensive
// tests, we could create a Batch struct that journals pending
// XXX: This is a wart to clean up later. Ideally, after doing
// extensive tests, we could create a Batch struct that journals pending
// operations which the given Persistence implementation could convert
// to its specific commit requirements.
batch, ok := b.(*batch)
@ -374,7 +387,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
return l.storage.Write(l.writeOptions, batch.batch)
}
// CompactKeyspace compacts the entire database's keyspace.
// Prune implements raw.Pruner. It compacts the entire keyspace of the database.
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
@ -389,6 +402,8 @@ func (l *LevelDBPersistence) Prune() {
l.storage.CompactRange(keyspace)
}
// Size returns the approximate size the entire database takes on disk (in
// bytes). It implements the raw.Database interface.
func (l *LevelDBPersistence) Size() (uint64, error) {
iterator, err := l.NewIterator(false)
if err != nil {
@ -459,6 +474,7 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) (Iterator, error) {
}, nil
}
// ForEach implements raw.ForEacher.
func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
iterator, err := l.NewIterator(true)
if err != nil {
@ -482,11 +498,11 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora
}
switch filter.Filter(decodedKey, decodedValue) {
case storage.STOP:
case storage.Stop:
return
case storage.SKIP:
case storage.Skip:
continue
case storage.ACCEPT:
case storage.Accept:
opErr := operator.Operate(decodedKey, decodedValue)
if opErr != nil {
if opErr.Continuable {

View file

@ -23,6 +23,11 @@ const (
sstablesKey = "leveldb.sstables"
)
// State returns the DatabaseState. It implements the raw.Database interface and
// sets the following Supplemental entries:
// "Low Level": leveldb property value for "leveldb.stats"
// "SSTable": leveldb property value for "leveldb.sstables"
// "Errors": only set if an error has occurred determining the size
func (l *LevelDBPersistence) State() *raw.DatabaseState {
databaseState := &raw.DatabaseState{
Location: l.path,

View file

@ -23,8 +23,8 @@ import (
const cacheCapacity = 0
type (
// Pair models a prospective (key, value) double that will be committed to
// a database.
// Pair models a prospective (key, value) double that will be committed
// to a database.
Pair interface {
Get() (key, value proto.Message)
}
@ -32,17 +32,18 @@ type (
// Pairs models a list of Pair for disk committing.
Pairs []Pair
// Preparer readies a LevelDB store for a given raw state given the fixtures
// definitions passed into it.
// Preparer readies a LevelDB store for a given raw state given the
// fixtures definitions passed into it.
Preparer interface {
// Prepare furnishes the database and returns its path along with any
// encountered anomalies.
// Prepare furnishes the database and returns its path along
// with any encountered anomalies.
Prepare(namespace string, f FixtureFactory) test.TemporaryDirectory
}
// FixtureFactory is an iterator emitting fixture data.
FixtureFactory interface {
// HasNext indicates whether the FixtureFactory has more pending fixture
// data to build.
// HasNext indicates whether the FixtureFactory has more pending
// fixture data to build.
HasNext() (has bool)
// Next emits the next (key, value) double for storage.
Next() (key, value proto.Message)
@ -85,10 +86,12 @@ func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory
return
}
// HasNext implements FixtureFactory.
func (f cassetteFactory) HasNext() bool {
return f.index < f.count
}
// Next implements FixtureFactory.
func (f *cassetteFactory) Next() (key, value proto.Message) {
key, value = f.pairs[f.index].Get()

View file

@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/utility"
)
// DatabaseState contains some fundamental attributes of a database.
type DatabaseState struct {
Name string
@ -28,12 +29,17 @@ type DatabaseState struct {
Supplemental map[string]string
}
// DatabaseStates is a sortable slice of DatabaseState pointers. It implements
// sort.Interface.
type DatabaseStates []*DatabaseState
// Len implements sort.Interface.
func (s DatabaseStates) Len() int {
return len(s)
}
// Less implements sort.Interface. The primary sorting criterion is the Name,
// the secondary criterion is the Size.
func (s DatabaseStates) Less(i, j int) bool {
l := s[i]
r := s[j]
@ -48,6 +54,7 @@ func (s DatabaseStates) Less(i, j int) bool {
return l.Size < r.Size
}
// Swap implements sort.Interface.
func (s DatabaseStates) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

View file

@ -17,7 +17,7 @@ import (
const (
putEndpoint = "/api/put"
contentTypeJson = "application/json"
contentTypeJSON = "application/json"
)
var (
@ -30,7 +30,7 @@ type Client struct {
httpClient *http.Client
}
// Create a new Client.
// NewClient creates a new Client.
func NewClient(url string, timeout time.Duration) *Client {
return &Client{
url: url,
@ -47,12 +47,13 @@ type StoreSamplesRequest struct {
Tags map[string]string `json:"tags"`
}
// Escape Prometheus label values to valid tag values for OpenTSDB.
// escapeTagValue escapes Prometheus label values to valid tag values for
// OpenTSDB.
func escapeTagValue(l clientmodel.LabelValue) string {
return illegalCharsRE.ReplaceAllString(string(l), "_")
}
// Translate Prometheus metric into OpenTSDB tags.
// tagsFromMetric translates Prometheus metric into OpenTSDB tags.
func tagsFromMetric(m clientmodel.Metric) map[string]string {
tags := make(map[string]string, len(m)-1)
for l, v := range m {
@ -64,7 +65,7 @@ func tagsFromMetric(m clientmodel.Metric) map[string]string {
return tags
}
// Send a batch of samples to OpenTSDB via its HTTP API.
// Store sends a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Store(samples clientmodel.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples {
@ -91,7 +92,7 @@ func (c *Client) Store(samples clientmodel.Samples) error {
resp, err := c.httpClient.Post(
u.String(),
contentTypeJson,
contentTypeJSON,
bytes.NewBuffer(buf),
)
if err != nil {
@ -116,5 +117,5 @@ func (c *Client) Store(samples clientmodel.Samples) error {
if err := json.Unmarshal(buf, &r); err != nil {
return err
}
return fmt.Errorf("Failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
}

View file

@ -47,7 +47,7 @@ type TSDBQueueManager struct {
drained chan bool
}
// Build a new TSDBQueueManager.
// NewTSDBQueueManager builds a new TSDBQueueManager.
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
return &TSDBQueueManager{
tsdb: tsdb,
@ -57,8 +57,8 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
}
}
// Queue a sample batch to be sent to the TSDB. This drops the most recently
// queued samples on the floor if the queue is full.
// Queue queues a sample batch to be sent to the TSDB. It drops the most
// recently queued samples on the floor if the queue is full.
func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
select {
case t.queue <- s:
@ -85,13 +85,13 @@ func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
}
}
// Report notification queue occupancy and capacity.
// reportQueues reports notification queue occupancy and capacity.
func (t *TSDBQueueManager) reportQueues() {
queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue)))
queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue)))
}
// Continuously send samples to the TSDB.
// Run continuously sends samples to the TSDB.
func (t *TSDBQueueManager) Run() {
defer func() {
close(t.drained)
@ -129,7 +129,7 @@ func (t *TSDBQueueManager) Run() {
}
}
// Flush remaining queued samples.
// Flush flushes remaining queued samples.
func (t *TSDBQueueManager) flush() {
if len(t.pendingSamples) > 0 {
go t.sendSamples(t.pendingSamples)
@ -137,7 +137,8 @@ func (t *TSDBQueueManager) flush() {
t.pendingSamples = t.pendingSamples[:0]
}
// Stop sending samples to the TSDB and wait for pending sends to complete.
// Close stops sending samples to the TSDB and waits for pending sends to
// complete.
func (t *TSDBQueueManager) Close() {
glog.Infof("TSDB queue manager shutting down...")
close(t.queue)