Code Review: First pass.

This commit is contained in:
Matt T. Proud 2013-08-05 17:31:49 +02:00
parent d8792cfd86
commit 07ac921aec
12 changed files with 237 additions and 177 deletions

35
main.go
View file

@ -28,6 +28,7 @@ import (
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
"github.com/prometheus/prometheus/web/api" "github.com/prometheus/prometheus/web/api"
) )
@ -72,10 +73,9 @@ type prometheus struct {
bodyCompactionTimer *time.Ticker bodyCompactionTimer *time.Ticker
tailCompactionTimer *time.Ticker tailCompactionTimer *time.Ticker
deletionTimer *time.Ticker deletionTimer *time.Ticker
reportDatabasesTimer *time.Ticker
curationMutex sync.Mutex curationMutex sync.Mutex
curationState chan metric.CurationState curationState chan metric.CurationState
databaseStates chan []string
stopBackgroundOperations chan bool stopBackgroundOperations chan bool
unwrittenSamples chan *extraction.Result unwrittenSamples chan *extraction.Result
@ -141,10 +141,6 @@ func (p *prometheus) close() {
p.deletionTimer.Stop() p.deletionTimer.Stop()
} }
if p.reportDatabasesTimer != nil {
p.reportDatabasesTimer.Stop()
}
if len(p.stopBackgroundOperations) == 0 { if len(p.stopBackgroundOperations) == 0 {
p.stopBackgroundOperations <- true p.stopBackgroundOperations <- true
} }
@ -157,26 +153,6 @@ func (p *prometheus) close() {
close(p.notifications) close(p.notifications)
close(p.stopBackgroundOperations) close(p.stopBackgroundOperations)
close(p.curationState) close(p.curationState)
close(p.databaseStates)
}
func (p *prometheus) reportDatabaseState() {
for _ = range p.reportDatabasesTimer.C {
// BUG(matt): Per Julius, ...
// These channel magic tricks confuse me and seem a bit awkward just to
// pass a status around. Now that we have Go 1.1, would it be maybe be
// nicer to pass ts.DiskStorage.States as a method value
// (http://tip.golang.org/ref/spec#Method_values) to the web layer
// instead of doing this?
select {
case <-p.databaseStates:
// Reset the future database state if nobody consumes it.
case p.databaseStates <- p.storage.DiskStorage.States():
// Set the database state so someone can consume it if they want.
default:
// Don't block.
}
}
} }
func main() { func main() {
@ -206,7 +182,6 @@ func main() {
unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity) unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity)
curationState := make(chan metric.CurationState, 1) curationState := make(chan metric.CurationState, 1)
databaseStates := make(chan []string, 1)
// Coprime numbers, fool! // Coprime numbers, fool!
headCompactionTimer := time.NewTicker(*headCompactInterval) headCompactionTimer := time.NewTicker(*headCompactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval) bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
@ -254,7 +229,7 @@ func main() {
} }
databasesHandler := &web.DatabasesHandler{ databasesHandler := &web.DatabasesHandler{
Incoming: databaseStates, Provider: ts.DiskStorage,
} }
metricsService := &api.MetricsService{ metricsService := &api.MetricsService{
@ -277,10 +252,7 @@ func main() {
deletionTimer: deletionTimer, deletionTimer: deletionTimer,
reportDatabasesTimer: time.NewTicker(15 * time.Minute),
curationState: curationState, curationState: curationState,
databaseStates: databaseStates,
unwrittenSamples: unwrittenSamples, unwrittenSamples: unwrittenSamples,
@ -297,7 +269,6 @@ func main() {
<-storageStarted <-storageStarted
go prometheus.interruptHandler() go prometheus.interruptHandler()
go prometheus.reportDatabaseState()
go func() { go func() {
for _ = range prometheus.headCompactionTimer.C { for _ = range prometheus.headCompactionTimer.C {

View file

@ -14,6 +14,7 @@
package metric package metric
import ( import (
"io"
"sort" "sort"
"code.google.com/p/goprotobuf/proto" "code.google.com/p/goprotobuf/proto"
@ -30,14 +31,16 @@ import (
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
type FingerprintMetricIndex interface { type FingerprintMetricIndex interface {
io.Closer
raw.Pruner
IndexBatch(FingerprintMetricMapping) error IndexBatch(FingerprintMetricMapping) error
Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error)
Close() error State() *raw.DatabaseState
State() string
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type leveldbFingerprintMetricIndex struct { type LeveldbFingerprintMetricIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
@ -45,22 +48,22 @@ type LevelDBFingerprintMetricIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func (i *leveldbFingerprintMetricIndex) Close() error { func (i *LeveldbFingerprintMetricIndex) Close() error {
i.p.Close() i.p.Close()
return nil return nil
} }
func (i *leveldbFingerprintMetricIndex) State() string { func (i *LeveldbFingerprintMetricIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
func (i *leveldbFingerprintMetricIndex) Size() (uint64, bool, error) { func (i *LeveldbFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.ApproximateSize()
return s, true, err return s, true, err
} }
func (i *leveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch() b := leveldb.NewBatch()
defer b.Close() defer b.Close()
@ -76,7 +79,7 @@ func (i *leveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapp
return i.p.Commit(b) return i.p.Commit(b)
} }
func (i *leveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
k := new(dto.Fingerprint) k := new(dto.Fingerprint)
dumpFingerprint(k, f) dumpFingerprint(k, f)
v := new(dto.Metric) v := new(dto.Metric)
@ -95,13 +98,19 @@ func (i *leveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m cl
return m, true, nil return m, true, nil
} }
func (i *LeveldbFingerprintMetricIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) { func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &leveldbFingerprintMetricIndex{ return &LeveldbFingerprintMetricIndex{
p: s, p: s,
}, nil }, nil
} }
@ -109,19 +118,21 @@ func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (
type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints
type LabelNameFingerprintIndex interface { type LabelNameFingerprintIndex interface {
io.Closer
raw.Pruner
IndexBatch(LabelNameFingerprintMapping) error IndexBatch(LabelNameFingerprintMapping) error
Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error)
Has(clientmodel.LabelName) (ok bool, err error) Has(clientmodel.LabelName) (ok bool, err error)
Close() error State() *raw.DatabaseState
State() string
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type leveldbLabelNameFingerprintIndex struct { type LeveldbLabelNameFingerprintIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
func (i *leveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error { func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -144,7 +155,7 @@ func (i *leveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp
return i.p.Commit(batch) return i.p.Commit(batch)
} }
func (i *leveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) { func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
k := new(dto.LabelName) k := new(dto.LabelName)
dumpLabelName(k, l) dumpLabelName(k, l)
v := new(dto.FingerprintCollection) v := new(dto.FingerprintCollection)
@ -165,24 +176,30 @@ func (i *leveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps
return fps, true, nil return fps, true, nil
} }
func (i *leveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) { func (i *LeveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.p.Has(&dto.LabelName{ return i.p.Has(&dto.LabelName{
Name: proto.String(string(l)), Name: proto.String(string(l)),
}) })
} }
func (i *leveldbLabelNameFingerprintIndex) Close() error { func (i *LeveldbLabelNameFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbLabelNameFingerprintIndex) Close() error {
i.p.Close() i.p.Close()
return nil return nil
} }
func (i *leveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) { func (i *LeveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.ApproximateSize()
return s, true, err return s, true, err
} }
func (i *leveldbLabelNameFingerprintIndex) State() string { func (i *LeveldbLabelNameFingerprintIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
@ -196,7 +213,7 @@ func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOption
return nil, err return nil, err
} }
return &leveldbLabelNameFingerprintIndex{ return &LeveldbLabelNameFingerprintIndex{
p: s, p: s,
}, nil }, nil
} }
@ -204,17 +221,18 @@ func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOption
type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelSetFingerprintIndex interface { type LabelSetFingerprintIndex interface {
io.Closer
raw.ForEacher raw.ForEacher
raw.Pruner
IndexBatch(LabelSetFingerprintMapping) error IndexBatch(LabelSetFingerprintMapping) error
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error) Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error) Has(*LabelPair) (ok bool, err error)
Close() error State() *raw.DatabaseState
State() string
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type leveldbLabelSetFingerprintIndex struct { type LeveldbLabelSetFingerprintIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
@ -222,7 +240,7 @@ type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
func (i *leveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error { func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -246,7 +264,7 @@ func (i *leveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMappin
return i.p.Commit(batch) return i.p.Commit(batch)
} }
func (i *leveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) { func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{ k := &dto.LabelPair{
Name: proto.String(string(p.Name)), Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)), Value: proto.String(string(p.Value)),
@ -271,7 +289,7 @@ func (i *leveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fi
return m, true, nil return m, true, nil
} }
func (i *leveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) { func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{ k := &dto.LabelPair{
Name: proto.String(string(p.Name)), Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)), Value: proto.String(string(p.Value)),
@ -280,21 +298,27 @@ func (i *leveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error)
return i.p.Has(k) return i.p.Has(k)
} }
func (i *leveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { func (i *LeveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o) return i.p.ForEach(d, f, o)
} }
func (i *leveldbLabelSetFingerprintIndex) Close() error { func (i *LeveldbLabelSetFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbLabelSetFingerprintIndex) Close() error {
i.p.Close() i.p.Close()
return nil return nil
} }
func (i *leveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) { func (i *LeveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.ApproximateSize()
return s, true, err return s, true, err
} }
func (i *leveldbLabelSetFingerprintIndex) State() string { func (i *LeveldbLabelSetFingerprintIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
@ -304,26 +328,28 @@ func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOption
return nil, err return nil, err
} }
return &leveldbLabelSetFingerprintIndex{ return &LeveldbLabelSetFingerprintIndex{
p: s, p: s,
}, nil }, nil
} }
type MetricMembershipIndex interface { type MetricMembershipIndex interface {
io.Closer
raw.Pruner
IndexBatch([]clientmodel.Metric) error IndexBatch([]clientmodel.Metric) error
Has(clientmodel.Metric) (ok bool, err error) Has(clientmodel.Metric) (ok bool, err error)
Close() error State() *raw.DatabaseState
State() string
Size() (s uint64, present bool, err error) Size() (s uint64, present bool, err error)
} }
type leveldbMetricMembershipIndex struct { type LeveldbMetricMembershipIndex struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
var existenceIdentity = new(dto.MembershipIndexValue) var existenceIdentity = new(dto.MembershipIndexValue)
func (i *leveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error { func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -336,28 +362,34 @@ func (i *leveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error
return i.p.Commit(batch) return i.p.Commit(batch)
} }
func (i *leveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) { func (i *LeveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
k := new(dto.Metric) k := new(dto.Metric)
dumpMetric(k, m) dumpMetric(k, m)
return i.p.Has(k) return i.p.Has(k)
} }
func (i *leveldbMetricMembershipIndex) Close() error { func (i *LeveldbMetricMembershipIndex) Close() error {
i.p.Close() i.p.Close()
return nil return nil
} }
func (i *leveldbMetricMembershipIndex) Size() (uint64, bool, error) { func (i *LeveldbMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.ApproximateSize()
return s, true, err return s, true, err
} }
func (i *leveldbMetricMembershipIndex) State() string { func (i *LeveldbMetricMembershipIndex) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
func (i *LeveldbMetricMembershipIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
type LevelDBMetricMembershipIndexOptions struct { type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions leveldb.LevelDBOptions
} }
@ -368,7 +400,7 @@ func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (Me
return nil, err return nil, err
} }
return &leveldbMetricMembershipIndex{ return &LeveldbMetricMembershipIndex{
p: s, p: s,
}, nil }, nil
} }

View file

@ -28,6 +28,7 @@ import (
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
@ -88,7 +89,7 @@ func (l *LevelDBMetricPersistence) Close() {
closer.Close() closer.Close()
case errorCloser: case errorCloser:
if err := closer.Close(); err != nil { if err := closer.Close(); err != nil {
log.Println("anomaly closing", err) log.Println("anomaly closing:", err)
} }
} }
} }
@ -767,14 +768,14 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La
// //
// Beware that it would probably be imprudent to run this on a live user-facing // Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications. // server due to latency implications.
func (l *LevelDBMetricPersistence) CompactKeyspaces() { func (l *LevelDBMetricPersistence) Prune() {
l.CurationRemarks.CompactKeyspace() l.CurationRemarks.Prune()
// l.fingerprintToMetrics.CompactKeyspace() l.fingerprintToMetrics.Prune()
// l.labelNameToFingerprints.CompactKeyspace() l.labelNameToFingerprints.Prune()
// l.labelSetToFingerprints.CompactKeyspace() l.labelSetToFingerprints.Prune()
// l.MetricHighWatermarks.CompactKeyspace() l.MetricHighWatermarks.Prune()
// l.metricMembershipIndex.CompactKeyspace() l.metricMembershipIndex.Prune()
l.MetricSamples.CompactKeyspace() l.MetricSamples.Prune()
} }
func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) { func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) {
@ -818,8 +819,8 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
return total, nil return total, nil
} }
func (l *LevelDBMetricPersistence) States() []string { func (l *LevelDBMetricPersistence) States() raw.DatabaseStates {
return []string{ return raw.DatabaseStates{
l.CurationRemarks.State(), l.CurationRemarks.State(),
l.fingerprintToMetrics.State(), l.fingerprintToMetrics.State(),
l.labelNameToFingerprints.State(), l.labelNameToFingerprints.State(),

View file

@ -15,6 +15,7 @@ package metric
import ( import (
"container/list" "container/list"
"io"
"sync" "sync"
"time" "time"
@ -170,20 +171,21 @@ func (lru *WatermarkCache) checkCapacity() {
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time
type HighWatermarker interface { type HighWatermarker interface {
io.Closer
raw.ForEacher raw.ForEacher
raw.Pruner
UpdateBatch(FingerprintHighWatermarkMapping) error UpdateBatch(FingerprintHighWatermarkMapping) error
Get(*clientmodel.Fingerprint) (t time.Time, ok bool, err error) Get(*clientmodel.Fingerprint) (t time.Time, ok bool, err error)
Close() error State() *raw.DatabaseState
State() string
Size() (uint64, bool, error) Size() (uint64, bool, error)
} }
type leveldbHighWatermarker struct { type LeveldbHighWatermarker struct {
p *leveldb.LevelDBPersistence p *leveldb.LevelDBPersistence
} }
func (w *leveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) { func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) {
k := new(dto.Fingerprint) k := new(dto.Fingerprint)
dumpFingerprint(k, f) dumpFingerprint(k, f)
v := new(dto.MetricHighWatermark) v := new(dto.MetricHighWatermark)
@ -198,7 +200,7 @@ func (w *leveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o
return t, true, nil return t, true, nil
} }
func (w *leveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error { func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
batch := leveldb.NewBatch() batch := leveldb.NewBatch()
defer batch.Close() defer batch.Close()
@ -217,7 +219,7 @@ func (w *leveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
continue continue
} }
// BUG(matt): Repace this with watermark management. // BUG(matt): Replace this with watermark management.
if t.After(existing) { if t.After(existing) {
v.Timestamp = proto.Int64(t.Unix()) v.Timestamp = proto.Int64(t.Unix())
batch.Put(k, v) batch.Put(k, v)
@ -227,21 +229,27 @@ func (w *leveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
return w.p.Commit(batch) return w.p.Commit(batch)
} }
func (i *leveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { func (i *LeveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o) return i.p.ForEach(d, f, o)
} }
func (i *leveldbHighWatermarker) Close() error { func (i *LeveldbHighWatermarker) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbHighWatermarker) Close() error {
i.p.Close() i.p.Close()
return nil return nil
} }
func (i *leveldbHighWatermarker) State() string { func (i *LeveldbHighWatermarker) State() *raw.DatabaseState {
return i.p.State() return i.p.State()
} }
func (i *leveldbHighWatermarker) Size() (uint64, bool, error) { func (i *LeveldbHighWatermarker) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize() s, err := i.p.ApproximateSize()
return s, true, err return s, true, err
} }
@ -256,7 +264,7 @@ func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (HighWatermarke
return nil, err return nil, err
} }
return &leveldbHighWatermarker{ return &LeveldbHighWatermarker{
p: s, p: s,
}, nil }, nil
} }

View file

@ -67,14 +67,14 @@ func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error {
// //
// Beware that it would probably be imprudent to run this on a live user-facing // Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications. // server due to latency implications.
func (l *LevelDBMembershipIndex) CompactKeyspace() { func (l *LevelDBMembershipIndex) Prune() {
l.persistence.CompactKeyspace() l.persistence.Prune()
} }
func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) { func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) {
return l.persistence.ApproximateSize() return l.persistence.ApproximateSize()
} }
func (l *LevelDBMembershipIndex) State() string { func (l *LevelDBMembershipIndex) State() *raw.DatabaseState {
return l.persistence.State() return l.persistence.State()
} }

View file

@ -64,3 +64,7 @@ type Batch interface {
// Drop follows the same protocol as Persistence.Drop. // Drop follows the same protocol as Persistence.Drop.
Drop(key proto.Message) Drop(key proto.Message)
} }
type Pruner interface {
Prune() (noop bool, err error)
}

View file

@ -163,6 +163,13 @@ func (i levigoIterator) GetError() (err error) {
return i.iterator.GetError() return i.iterator.GetError()
} }
type Compression uint
const (
Snappy Compression = iota
Uncompressed
)
type LevelDBOptions struct { type LevelDBOptions struct {
Path string Path string
Name string Name string
@ -174,15 +181,16 @@ type LevelDBOptions struct {
FlushOnMutate bool FlushOnMutate bool
UseParanoidChecks bool UseParanoidChecks bool
NotUseSnappy bool Compression Compression
} }
func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) { func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) {
options := levigo.NewOptions() options := levigo.NewOptions()
options.SetCreateIfMissing(true) options.SetCreateIfMissing(true)
options.SetParanoidChecks(o.UseParanoidChecks) options.SetParanoidChecks(o.UseParanoidChecks)
compression := levigo.SnappyCompression compression := levigo.SnappyCompression
if !o.NotUseSnappy { if o.Compression == Uncompressed {
compression = levigo.NoCompression compression = levigo.NoCompression
} }
options.SetCompression(compression) options.SetCompression(compression)
@ -313,7 +321,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
// //
// Beware that it would probably be imprudent to run this on a live user-facing // Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications. // server due to latency implications.
func (l *LevelDBPersistence) CompactKeyspace() { func (l *LevelDBPersistence) Prune() {
// Magic values per https://code.google.com/p/leveldb/source/browse/include/leveldb/db.h#131. // Magic values per https://code.google.com/p/leveldb/source/browse/include/leveldb/db.h#131.
keyspace := levigo.Range{ keyspace := levigo.Range{

View file

@ -14,9 +14,7 @@
package leveldb package leveldb
import ( import (
"bytes" "github.com/prometheus/prometheus/storage/raw"
"fmt"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
) )
@ -25,50 +23,22 @@ const (
sstablesKey = "leveldb.sstables" sstablesKey = "leveldb.sstables"
) )
// DatabaseState models a bundle of metadata about a LevelDB database used in func (l *LevelDBPersistence) State() *raw.DatabaseState {
// template format string interpolation. databaseState := &raw.DatabaseState{
type DatabaseState struct { Location: l.path,
Name string
Purpose string
Path string
LowLevelStatus string
SSTablesStatus string
ApproximateSize utility.ByteSize
Error error
}
func (s DatabaseState) String() string {
b := new(bytes.Buffer)
fmt.Fprintln(b, "Name:", s.Name)
fmt.Fprintln(b, "Path:", s.Path)
fmt.Fprintln(b, "Purpose:", s.Purpose)
fmt.Fprintln(b, "Low Level Diagnostics:", s.LowLevelStatus)
fmt.Fprintln(b, "SSTable Statistics:", s.SSTablesStatus)
fmt.Fprintln(b, "Approximate Size:", s.ApproximateSize)
fmt.Fprintln(b, "Error:", s.Error)
return b.String()
}
func (l *LevelDBPersistence) LowLevelState() DatabaseState {
databaseState := DatabaseState{
Path: l.path,
Name: l.name, Name: l.name,
Purpose: l.purpose, Purpose: l.purpose,
LowLevelStatus: l.storage.PropertyValue(statsKey), Supplemental: map[string]string{},
SSTablesStatus: l.storage.PropertyValue(sstablesKey),
} }
if size, err := l.ApproximateSize(); err != nil { if size, err := l.ApproximateSize(); err != nil {
databaseState.Error = err databaseState.Supplemental["Errors"] = err.Error()
} else { } else {
databaseState.ApproximateSize = utility.ByteSize(size) databaseState.Size = utility.ByteSize(size)
} }
databaseState.Supplemental["Low Level"] = l.storage.PropertyValue(statsKey)
databaseState.Supplemental["SSTable"] = l.storage.PropertyValue(sstablesKey)
return databaseState return databaseState
} }
func (l *LevelDBPersistence) State() string {
return l.LowLevelState().String()
}

53
storage/raw/state.go Normal file
View file

@ -0,0 +1,53 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raw
import (
"github.com/prometheus/prometheus/utility"
)
type DatabaseState struct {
Name string
Size utility.ByteSize
Location string
Purpose string
Supplemental map[string]string
}
type DatabaseStates []*DatabaseState
func (s DatabaseStates) Len() int {
return len(s)
}
func (s DatabaseStates) Less(i, j int) bool {
l := s[i]
r := s[j]
if l.Name > r.Name {
return false
}
if l.Name < r.Name {
return true
}
return l.Size < r.Size
}
func (s DatabaseStates) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

View file

@ -44,7 +44,7 @@ func main() {
log.Printf("Starting compaction...") log.Printf("Starting compaction...")
size, _ := persistences.ApproximateSizes() size, _ := persistences.ApproximateSizes()
log.Printf("Original Size: %d", size) log.Printf("Original Size: %d", size)
persistences.CompactKeyspaces() persistences.Prune()
log.Printf("Finished in %s", time.Since(start)) log.Printf("Finished in %s", time.Since(start))
size, _ = persistences.ApproximateSizes() size, _ = persistences.ApproximateSizes()
log.Printf("New Size: %d", size) log.Printf("New Size: %d", size)

View file

@ -16,26 +16,45 @@ package web
import ( import (
"net/http" "net/http"
"sync" "sync"
"time"
"github.com/prometheus/prometheus/storage/raw"
) )
type DatabasesHandler struct { type DatabaseStatesProvider interface {
States []string States() raw.DatabaseStates
}
Incoming chan []string type DatabasesHandler struct {
RefreshInterval time.Duration
NextRefresh time.Time
Current raw.DatabaseStates
Provider DatabaseStatesProvider
mutex sync.RWMutex mutex sync.RWMutex
} }
func (h *DatabasesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *DatabasesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
select { h.Refresh()
case states := <-h.Incoming:
h.mutex.Lock()
defer h.mutex.Unlock()
h.States = states
default:
h.mutex.RLock() h.mutex.RLock()
defer h.mutex.RUnlock() defer h.mutex.RUnlock()
}
executeTemplate(w, "databases", h) executeTemplate(w, "databases", h)
} }
func (h *DatabasesHandler) Refresh() {
h.mutex.RLock()
if !time.Now().After(h.NextRefresh) {
h.mutex.RUnlock()
return
}
h.mutex.RUnlock()
h.mutex.Lock()
defer h.mutex.Unlock()
h.Current = h.Provider.States()
h.NextRefresh = time.Now().Add(h.RefreshInterval)
}

View file

@ -3,34 +3,28 @@
{{define "content"}} {{define "content"}}
<div class="container-fluid"> <div class="container-fluid">
<h2>Database Information</h2> <h2>Database Information</h2>
{{range .States}} {{range $database := .Current }}
<h3>{{.Name}}</h3> <h3>{{$database.Name}}</h3>
<table class="table table-bordered table-condensed table-hover literal_output"> <table class="table table-bordered table-condensed table-hover literal_output">
<tbody> <tbody>
<tr> <tr>
<th>Path</th> <th>Location</th>
<td>{{.Path}}</td> <td>{{$database.Location}}</td>
</tr> </tr>
<tr> <tr>
<th>Last Refreshed</th> <th>Purpose</th>
<td>{{.LastRefreshed}}</td> <td>{{$database.Purpose}}</td>
</tr> </tr>
<tr> <tr>
<th>Type</th> <th>Size</th>
<td>{{.Type}}</td> <td>{{$database.Size}}</td>
</tr> </tr>
{{range $subject, $state := $database.Supplemental }}
<tr> <tr>
<th>Approximate Size</th> <th>{{$subject}}</th>
<td>{{.ApproximateSize}}</td> <td><pre>{{$state}}</pre></td>
</tr>
<tr>
<th>Low Level Status</th>
<td><pre>{{.LowLevelStatus}}</pre></td>
</tr>
<tr>
<th>SSTable Status</th>
<td><pre>{{.SSTablesStatus}}</pre></td>
</tr> </tr>
{{end}}
</tbody> </tbody>
</table> </table>
{{end}} {{end}}