Merge pull request #346 from prometheus/refactor/storage/componentize

Componentize Storage Subsystems
This commit is contained in:
Matt T. Proud 2013-08-05 08:50:50 -07:00
commit 032373a2c7
18 changed files with 900 additions and 439 deletions

View file

@ -17,7 +17,7 @@ include Makefile.INCLUDE
all: binary test
$(GOCC): $(BUILD_PATH)/cache/$(GOPKG) source_path
$(GOCC): $(BUILD_PATH)/cache/$(GOPKG) $(FULL_GOPATH)
tar -C $(BUILD_PATH)/root -xzf $<
touch $@
@ -58,7 +58,7 @@ format:
model: dependencies preparation
$(MAKE) -C model
preparation: $(GOCC) source_path
preparation: $(GOCC) $(FULL_GOPATH)
$(MAKE) -C $(BUILD_PATH)
race_condition_binary: build
@ -76,15 +76,14 @@ search_index:
server: config dependencies model preparation
$(MAKE) -C server
# source_path is responsible for ensuring that the builder has not done anything
# $(FULL_GOPATH) is responsible for ensuring that the builder has not done anything
# stupid like working on Prometheus outside of ${GOPATH}.
source_path:
$(FULL_GOPATH):
-[ -d "$(FULL_GOPATH)" ] || { mkdir -vp $(FULL_GOPATH_BASE) ; ln -s "$(PWD)" "$(FULL_GOPATH)" ; }
[ -d "$(FULL_GOPATH)" ]
test: build
$(GOENV) find . -maxdepth 1 -mindepth 1 -type d -and -not -path $(BUILD_PATH) -exec $(GOCC) test {}/... $(GO_TEST_FLAGS) \;
$(GO) test $(GO_TEST_FLAGS)
$(GO) test $(GO_TEST_FLAGS) ./...
tools: dependencies preparation
$(MAKE) -C tools
@ -95,4 +94,4 @@ update:
web: config dependencies model preparation
$(MAKE) -C web
.PHONY: advice binary build clean config dependencies documentation format model package preparation race_condition_binary race_condition_run run search_index source_path test tools update
.PHONY: advice binary build clean config dependencies documentation format model package preparation race_condition_binary race_condition_run run search_index test tools update

36
main.go
View file

@ -28,7 +28,7 @@ import (
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/web"
"github.com/prometheus/prometheus/web/api"
)
@ -73,10 +73,9 @@ type prometheus struct {
bodyCompactionTimer *time.Ticker
tailCompactionTimer *time.Ticker
deletionTimer *time.Ticker
reportDatabasesTimer *time.Ticker
curationMutex sync.Mutex
curationState chan metric.CurationState
databaseStates chan []leveldb.DatabaseState
stopBackgroundOperations chan bool
unwrittenSamples chan *extraction.Result
@ -142,10 +141,6 @@ func (p *prometheus) close() {
p.deletionTimer.Stop()
}
if p.reportDatabasesTimer != nil {
p.reportDatabasesTimer.Stop()
}
if len(p.stopBackgroundOperations) == 0 {
p.stopBackgroundOperations <- true
}
@ -158,26 +153,6 @@ func (p *prometheus) close() {
close(p.notifications)
close(p.stopBackgroundOperations)
close(p.curationState)
close(p.databaseStates)
}
func (p *prometheus) reportDatabaseState() {
for _ = range p.reportDatabasesTimer.C {
// BUG(matt): Per Julius, ...
// These channel magic tricks confuse me and seem a bit awkward just to
// pass a status around. Now that we have Go 1.1, would it be maybe be
// nicer to pass ts.DiskStorage.States as a method value
// (http://tip.golang.org/ref/spec#Method_values) to the web layer
// instead of doing this?
select {
case <-p.databaseStates:
// Reset the future database state if nobody consumes it.
case p.databaseStates <- p.storage.DiskStorage.States():
// Set the database state so someone can consume it if they want.
default:
// Don't block.
}
}
}
func main() {
@ -207,7 +182,6 @@ func main() {
unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity)
curationState := make(chan metric.CurationState, 1)
databaseStates := make(chan []leveldb.DatabaseState, 1)
// Coprime numbers, fool!
headCompactionTimer := time.NewTicker(*headCompactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
@ -255,7 +229,7 @@ func main() {
}
databasesHandler := &web.DatabasesHandler{
Incoming: databaseStates,
Provider: ts.DiskStorage,
}
metricsService := &api.MetricsService{
@ -278,10 +252,7 @@ func main() {
deletionTimer: deletionTimer,
reportDatabasesTimer: time.NewTicker(15 * time.Minute),
curationState: curationState,
databaseStates: databaseStates,
unwrittenSamples: unwrittenSamples,
@ -298,7 +269,6 @@ func main() {
<-storageStarted
go prometheus.interruptHandler()
go prometheus.reportDatabaseState()
go func() {
for _ = range prometheus.headCompactionTimer.C {

View file

@ -92,7 +92,7 @@ type watermarkScanner struct {
// curated.
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) {
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) {
defer func(t time.Time) {
duration := float64(time.Since(t) / time.Millisecond)

406
storage/metric/index.go Normal file
View file

@ -0,0 +1,406 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"io"
"sort"
"code.google.com/p/goprotobuf/proto"
clientmodel "github.com/prometheus/client_golang/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
)
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
type FingerprintMetricIndex interface {
io.Closer
raw.Pruner
IndexBatch(FingerprintMetricMapping) error
Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
type LeveldbFingerprintMetricIndex struct {
p *leveldb.LevelDBPersistence
}
type LevelDBFingerprintMetricIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *LeveldbFingerprintMetricIndex) Close() error {
i.p.Close()
return nil
}
func (i *LeveldbFingerprintMetricIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LeveldbFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch()
defer b.Close()
for f, m := range mapping {
k := new(dto.Fingerprint)
dumpFingerprint(k, &f)
v := new(dto.Metric)
dumpMetric(v, m)
b.Put(k, v)
}
return i.p.Commit(b)
}
func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
k := new(dto.Fingerprint)
dumpFingerprint(k, f)
v := new(dto.Metric)
if ok, err := i.p.Get(k, v); !ok {
return nil, false, nil
} else if err != nil {
return nil, false, err
}
m = clientmodel.Metric{}
for _, pair := range v.LabelPair {
m[clientmodel.LabelName(pair.GetName())] = clientmodel.LabelValue(pair.GetValue())
}
return m, true, nil
}
func (i *LeveldbFingerprintMetricIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbFingerprintMetricIndex{
p: s,
}, nil
}
type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints
type LabelNameFingerprintIndex interface {
io.Closer
raw.Pruner
IndexBatch(LabelNameFingerprintMapping) error
Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error)
Has(clientmodel.LabelName) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
type LeveldbLabelNameFingerprintIndex struct {
p *leveldb.LevelDBPersistence
}
func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for labelName, fingerprints := range b {
sort.Sort(fingerprints)
key := &dto.LabelName{
Name: proto.String(string(labelName)),
}
value := new(dto.FingerprintCollection)
for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint)
dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f)
}
batch.Put(key, value)
}
return i.p.Commit(batch)
}
func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
k := new(dto.LabelName)
dumpLabelName(k, l)
v := new(dto.FingerprintCollection)
ok, err = i.p.Get(k, v)
if err != nil {
return nil, false, err
}
if !ok {
return nil, false, nil
}
for _, m := range v.Member {
fp := new(clientmodel.Fingerprint)
loadFingerprint(fp, m)
fps = append(fps, fp)
}
return fps, true, nil
}
func (i *LeveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.p.Has(&dto.LabelName{
Name: proto.String(string(l)),
})
}
func (i *LeveldbLabelNameFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbLabelNameFingerprintIndex) Close() error {
i.p.Close()
return nil
}
func (i *LeveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *LeveldbLabelNameFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
}
type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (LabelNameFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbLabelNameFingerprintIndex{
p: s,
}, nil
}
type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelSetFingerprintIndex interface {
io.Closer
raw.ForEacher
raw.Pruner
IndexBatch(LabelSetFingerprintMapping) error
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
type LeveldbLabelSetFingerprintIndex struct {
p *leveldb.LevelDBPersistence
}
type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for pair, fps := range m {
sort.Sort(fps)
key := &dto.LabelPair{
Name: proto.String(string(pair.Name)),
Value: proto.String(string(pair.Value)),
}
value := new(dto.FingerprintCollection)
for _, fp := range fps {
f := new(dto.Fingerprint)
dumpFingerprint(f, fp)
value.Member = append(value.Member, f)
}
batch.Put(key, value)
}
return i.p.Commit(batch)
}
func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
}
v := new(dto.FingerprintCollection)
ok, err = i.p.Get(k, v)
if !ok {
return nil, false, nil
}
if err != nil {
return nil, false, err
}
for _, pair := range v.Member {
fp := new(clientmodel.Fingerprint)
loadFingerprint(fp, pair)
m = append(m, fp)
}
return m, true, nil
}
func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
}
return i.p.Has(k)
}
func (i *LeveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *LeveldbLabelSetFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbLabelSetFingerprintIndex) Close() error {
i.p.Close()
return nil
}
func (i *LeveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *LeveldbLabelSetFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
}
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (LabelSetFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbLabelSetFingerprintIndex{
p: s,
}, nil
}
type MetricMembershipIndex interface {
io.Closer
raw.Pruner
IndexBatch([]clientmodel.Metric) error
Has(clientmodel.Metric) (ok bool, err error)
State() *raw.DatabaseState
Size() (s uint64, present bool, err error)
}
type LeveldbMetricMembershipIndex struct {
p *leveldb.LevelDBPersistence
}
var existenceIdentity = new(dto.MembershipIndexValue)
func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error {
batch := leveldb.NewBatch()
defer batch.Close()
for _, m := range ms {
k := new(dto.Metric)
dumpMetric(k, m)
batch.Put(k, existenceIdentity)
}
return i.p.Commit(batch)
}
func (i *LeveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
k := new(dto.Metric)
dumpMetric(k, m)
return i.p.Has(k)
}
func (i *LeveldbMetricMembershipIndex) Close() error {
i.p.Close()
return nil
}
func (i *LeveldbMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *LeveldbMetricMembershipIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LeveldbMetricMembershipIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (MetricMembershipIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbMetricMembershipIndex{
p: s,
}, nil
}

View file

@ -26,9 +26,9 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
dto "github.com/prometheus/prometheus/model/generated"
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
)
@ -37,11 +37,11 @@ const sortConcurrency = 2
type LevelDBMetricPersistence struct {
CurationRemarks *leveldb.LevelDBPersistence
fingerprintToMetrics *leveldb.LevelDBPersistence
labelNameToFingerprints *leveldb.LevelDBPersistence
labelSetToFingerprints *leveldb.LevelDBPersistence
MetricHighWatermarks *leveldb.LevelDBPersistence
metricMembershipIndex *index.LevelDBMembershipIndex
fingerprintToMetrics FingerprintMetricIndex
labelNameToFingerprints LabelNameFingerprintIndex
labelSetToFingerprints LabelSetFingerprintIndex
MetricHighWatermarks HighWatermarker
metricMembershipIndex MetricMembershipIndex
MetricSamples *leveldb.LevelDBPersistence
}
@ -60,12 +60,15 @@ var (
)
type leveldbOpener func()
type leveldbCloser interface {
type errorCloser interface {
Close() error
}
type closer interface {
Close()
}
func (l *LevelDBMetricPersistence) Close() {
var persistences = []leveldbCloser{
var persistences = []interface{}{
l.CurationRemarks,
l.fingerprintToMetrics,
l.labelNameToFingerprints,
@ -77,14 +80,21 @@ func (l *LevelDBMetricPersistence) Close() {
closerGroup := sync.WaitGroup{}
for _, closer := range persistences {
for _, c := range persistences {
closerGroup.Add(1)
go func(closer leveldbCloser) {
if closer != nil {
go func(c interface{}) {
if c != nil {
switch closer := c.(type) {
case closer:
closer.Close()
case errorCloser:
if err := closer.Close(); err != nil {
log.Println("anomaly closing:", err)
}
}
}
closerGroup.Done()
}(closer)
}(c)
}
closerGroup.Wait()
@ -103,7 +113,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Label Names and Value Pairs by Fingerprint",
func() {
var err error
emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10)
emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(&LevelDBFingerprintMetricIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Metrics by Fingerprint",
Purpose: "Index",
Path: baseDirectory + "/label_name_and_value_pairs_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
},
})
workers.MayFail(err)
},
},
@ -111,7 +128,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Samples by Fingerprint",
func() {
var err error
emission.MetricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10)
o := &leveldb.LevelDBOptions{
Name: "Samples",
Purpose: "Timeseries",
Path: baseDirectory + "/samples_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
}
emission.MetricSamples, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err)
},
},
@ -119,7 +142,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"High Watermarks by Fingerprint",
func() {
var err error
emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10)
emission.MetricHighWatermarks, err = NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "High Watermarks",
Purpose: "The youngest sample in the database per metric.",
Path: baseDirectory + "/high_watermarks_by_fingerprint",
CacheSizeBytes: *highWatermarkCacheSize,
}})
workers.MayFail(err)
},
},
@ -127,7 +156,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name",
func() {
var err error
emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10)
emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(&LevelDBLabelNameFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Name",
Purpose: "Index",
Path: baseDirectory + "/fingerprints_by_label_name",
CacheSizeBytes: *labelNameToFingerprintsCacheSize,
},
})
workers.MayFail(err)
},
},
@ -135,7 +171,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair",
func() {
var err error
emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10)
emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(&LevelDBLabelSetFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Pair",
Purpose: "Index",
Path: baseDirectory + "/fingerprints_by_label_name_and_value_pair",
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
},
})
workers.MayFail(err)
},
},
@ -143,7 +186,15 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Metric Membership Index",
func() {
var err error
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10)
emission.metricMembershipIndex, err = NewLevelDBMetricMembershipIndex(
&LevelDBMetricMembershipIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Metric Membership",
Purpose: "Index",
Path: baseDirectory + "/metric_membership_index",
CacheSizeBytes: *metricMembershipIndexCacheSize,
},
})
workers.MayFail(err)
},
},
@ -151,7 +202,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Sample Curation Remarks",
func() {
var err error
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10)
o := &leveldb.LevelDBOptions{
Name: "Sample Curation Remarks",
Purpose: "Ledger of Progress for Various Curators",
Path: baseDirectory + "/curation_remarks",
CacheSizeBytes: *curationRemarksCacheSize,
}
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(o)
workers.MayFail(err)
},
},
@ -222,19 +279,16 @@ func groupByFingerprint(samples clientmodel.Samples) map[clientmodel.Fingerprint
// findUnindexedMetrics scours the metric membership index for each given Metric
// in the keyspace and returns a map of Fingerprint-Metric pairs that are
// absent.
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed map[clientmodel.Fingerprint]clientmodel.Metric, err error) {
func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed FingerprintMetricMapping, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure})
}(time.Now())
unindexed = make(map[clientmodel.Fingerprint]clientmodel.Metric)
dto := &dto.Metric{}
unindexed = FingerprintMetricMapping{}
for fingerprint, metric := range candidates {
dumpMetric(dto, metric)
indexHas, err := l.hasIndexMetric(dto)
indexHas, err := l.hasIndexMetric(metric)
if err != nil {
return unindexed, err
}
@ -251,67 +305,47 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmod
// the index to reflect the new state.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
func (l *LevelDBMetricPersistence) indexLabelNames(metrics FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure})
}(time.Now())
labelNameFingerprints := map[clientmodel.LabelName]utility.Set{}
retrieved := map[clientmodel.LabelName]utility.Set{}
for fingerprint, metric := range metrics {
for labelName := range metric {
fingerprintSet, ok := labelNameFingerprints[labelName]
fingerprintSet, ok := retrieved[labelName]
if !ok {
fingerprintSet = utility.Set{}
fingerprints, err := l.GetFingerprintsForLabelName(labelName)
if err != nil {
return err
}
fingerprintSet = utility.Set{}
retrieved[labelName] = fingerprintSet
for _, fingerprint := range fingerprints {
fingerprintSet.Add(*fingerprint)
}
}
fingerprintSet.Add(fingerprint)
labelNameFingerprints[labelName] = fingerprintSet
}
}
batch := leveldb.NewBatch()
defer batch.Close()
for labelName, fingerprintSet := range labelNameFingerprints {
fingerprints := clientmodel.Fingerprints{}
for e := range fingerprintSet {
fingerprint := e.(clientmodel.Fingerprint)
fingerprints = append(fingerprints, &fingerprint)
pending := LabelNameFingerprintMapping{}
for name, set := range retrieved {
fps := pending[name]
for fp := range set {
f := fp.(clientmodel.Fingerprint)
fps = append(fps, &f)
}
pending[name] = fps
}
sort.Sort(fingerprints)
key := &dto.LabelName{
Name: proto.String(string(labelName)),
}
value := new(dto.FingerprintCollection)
for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint)
dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f)
}
batch.Put(key, value)
}
err = l.labelNameToFingerprints.Commit(batch)
if err != nil {
return
}
return
return l.labelNameToFingerprints.IndexBatch(pending)
}
// indexLabelPairs accumulates all label pair to fingerprint index entries for
@ -326,7 +360,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Finge
recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure})
}(time.Now())
labelPairFingerprints := map[LabelPair]utility.Set{}
collection := map[LabelPair]utility.Set{}
for fingerprint, metric := range metrics {
for labelName, labelValue := range metric {
@ -334,113 +368,69 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Finge
Name: labelName,
Value: labelValue,
}
fingerprintSet, ok := labelPairFingerprints[labelPair]
fingerprintSet, ok := collection[labelPair]
if !ok {
fingerprintSet = utility.Set{}
fingerprints, err := l.GetFingerprintsForLabelSet(clientmodel.LabelSet{
labelName: labelValue,
})
fingerprints, _, err := l.labelSetToFingerprints.Lookup(&labelPair)
if err != nil {
return err
}
fingerprintSet = utility.Set{}
for _, fingerprint := range fingerprints {
fingerprintSet.Add(*fingerprint)
}
collection[labelPair] = fingerprintSet
}
fingerprintSet.Add(fingerprint)
labelPairFingerprints[labelPair] = fingerprintSet
}
}
batch := leveldb.NewBatch()
defer batch.Close()
batch := LabelSetFingerprintMapping{}
for labelPair, fingerprintSet := range labelPairFingerprints {
fingerprints := clientmodel.Fingerprints{}
for e := range fingerprintSet {
fingerprint := e.(clientmodel.Fingerprint)
fingerprints = append(fingerprints, &fingerprint)
for pair, elements := range collection {
fps := batch[pair]
for element := range elements {
fp := element.(clientmodel.Fingerprint)
fps = append(fps, &fp)
}
batch[pair] = fps
}
sort.Sort(fingerprints)
key := &dto.LabelPair{
Name: proto.String(string(labelPair.Name)),
Value: proto.String(string(labelPair.Value)),
}
value := new(dto.FingerprintCollection)
for _, fingerprint := range fingerprints {
f := new(dto.Fingerprint)
dumpFingerprint(f, fingerprint)
value.Member = append(value.Member, f)
}
batch.Put(key, value)
}
err = l.labelSetToFingerprints.Commit(batch)
if err != nil {
return
}
return
return l.labelSetToFingerprints.IndexBatch(batch)
}
// indexFingerprints updates all of the Fingerprint to Metric reverse lookups
// in the index and then bulk updates.
//
// This operation is idempotent.
func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
func (l *LevelDBMetricPersistence) indexFingerprints(b FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure})
}(time.Now())
batch := leveldb.NewBatch()
defer batch.Close()
for fingerprint, metric := range metrics {
f := new(dto.Fingerprint)
dumpFingerprint(f, &fingerprint)
m := &dto.Metric{}
dumpMetric(m, metric)
batch.Put(f, m)
}
err = l.fingerprintToMetrics.Commit(batch)
if err != nil {
return
}
return
return l.fingerprintToMetrics.IndexBatch(b)
}
var existenceIdentity = &dto.MembershipIndexValue{}
// indexMetrics takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all
// affected indices.
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[clientmodel.Fingerprint]clientmodel.Metric) (err error) {
func (l *LevelDBMetricPersistence) indexMetrics(fingerprints FingerprintMetricMapping) (err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure})
}(time.Now())
var (
absentMetrics map[clientmodel.Fingerprint]clientmodel.Metric
)
absentMetrics, err = l.findUnindexedMetrics(fingerprints)
absentees, err := l.findUnindexedMetrics(fingerprints)
if err != nil {
return
}
if len(absentMetrics) == 0 {
if len(absentees) == 0 {
return
}
@ -449,42 +439,32 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[clientmodel.Fin
workers := utility.NewUncertaintyGroup(3)
go func() {
workers.MayFail(l.indexLabelNames(absentMetrics))
workers.MayFail(l.indexLabelNames(absentees))
}()
go func() {
workers.MayFail(l.indexLabelPairs(absentMetrics))
workers.MayFail(l.indexLabelPairs(absentees))
}()
go func() {
workers.MayFail(l.indexFingerprints(absentMetrics))
workers.MayFail(l.indexFingerprints(absentees))
}()
if !workers.Wait() {
return fmt.Errorf("Could not index due to %s", workers.Errors())
}
// If any of the preceding operations failed, we will have inconsistent
// indices. Thusly, the Metric membership index should NOT be updated, as
// its state is used to determine whether to bulk update the other indices.
// Given that those operations are idempotent, it is OK to repeat them;
// however, it will consume considerable amounts of time.
batch := leveldb.NewBatch()
defer batch.Close()
for _, metric := range absentMetrics {
m := &dto.Metric{}
dumpMetric(m, metric)
batch.Put(m, existenceIdentity)
if !workers.Wait() {
return fmt.Errorf("Could not index due to %s", workers.Errors())
}
err = l.metricMembershipIndex.Commit(batch)
if err != nil {
// Not critical but undesirable.
log.Println(err)
ms := []clientmodel.Metric{}
for _, m := range absentees {
ms = append(ms, m)
}
return
return l.metricMembershipIndex.IndexBatch(ms)
}
func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) {
@ -494,41 +474,16 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.
recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure})
}(time.Now())
batch := leveldb.NewBatch()
defer batch.Close()
value := &dto.MetricHighWatermark{}
for fingerprint, samples := range groups {
value.Reset()
f := new(dto.Fingerprint)
dumpFingerprint(f, &fingerprint)
present, err := l.MetricHighWatermarks.Get(f, value)
if err != nil {
return err
}
newestSampleTimestamp := samples[len(samples)-1].Timestamp
if !present {
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(f, value)
b := FingerprintHighWatermarkMapping{}
for fp, ss := range groups {
if len(ss) == 0 {
continue
}
// BUG(matt): Repace this with watermark management.
if newestSampleTimestamp.After(time.Unix(value.GetTimestamp(), 0)) {
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(f, value)
}
b[fp] = ss[len(ss)-1].Timestamp
}
err = l.MetricHighWatermarks.Commit(batch)
if err != nil {
return err
}
return nil
return l.MetricHighWatermarks.UpdateBatch(b)
}
func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
@ -543,7 +498,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
watermarkErrChan := make(chan error, 1)
go func(groups map[clientmodel.Fingerprint]clientmodel.Samples) {
metrics := map[clientmodel.Fingerprint]clientmodel.Metric{}
metrics := FingerprintMetricMapping{}
for fingerprint, samples := range groups {
metrics[fingerprint] = samples[0].Metric
@ -637,38 +592,34 @@ func extractSampleValues(i leveldb.Iterator) (Values, error) {
return NewValuesFromDTO(v), nil
}
func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) {
func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure})
}(time.Now())
value, err = l.metricMembershipIndex.Has(dto)
return
return l.metricMembershipIndex.Has(m)
}
func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) {
func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure})
}(time.Now())
value, err = l.labelSetToFingerprints.Has(dto)
return
return l.labelSetToFingerprints.Has(p)
}
func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) {
func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value bool, err error) {
defer func(begin time.Time) {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure})
}(time.Now())
value, err = l.labelNameToFingerprints.Has(dto)
value, err = l.labelNameToFingerprints.Has(n)
return
}
@ -681,29 +632,19 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod
}(time.Now())
sets := []utility.Set{}
pair := &dto.LabelPair{}
unmarshaled := new(dto.FingerprintCollection)
for name, value := range labelSet {
pair.Reset()
unmarshaled.Reset()
pair.Name = proto.String(string(name))
pair.Value = proto.String(string(value))
present, err := l.labelSetToFingerprints.Get(pair, unmarshaled)
fps, _, err := l.labelSetToFingerprints.Lookup(&LabelPair{
Name: name,
Value: value,
})
if err != nil {
return fps, err
}
if !present {
return nil, nil
return nil, err
}
set := utility.Set{}
for _, m := range unmarshaled.Member {
fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, m)
for _, fp := range fps {
set.Add(*fp)
}
@ -734,24 +675,10 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientm
recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure})
}(time.Now())
unmarshaled := new(dto.FingerprintCollection)
d := &dto.LabelName{}
dumpLabelName(d, labelName)
present, err := l.labelNameToFingerprints.Get(d, unmarshaled)
if err != nil {
return nil, err
}
if !present {
return nil, nil
}
// TODO(matt): Update signature to work with ok.
fps, _, err = l.labelNameToFingerprints.Lookup(labelName)
for _, m := range unmarshaled.Member {
fp := &clientmodel.Fingerprint{}
loadFingerprint(fp, m)
fps = append(fps, fp)
}
return fps, nil
return fps, err
}
func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) {
@ -761,22 +688,8 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger
recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure})
}(time.Now())
unmarshaled := &dto.Metric{}
d := new(dto.Fingerprint)
dumpFingerprint(d, f)
present, err := l.fingerprintToMetrics.Get(d, unmarshaled)
if err != nil {
return nil, err
}
if !present {
return nil, nil
}
m = clientmodel.Metric{}
for _, v := range unmarshaled.LabelPair {
m[clientmodel.LabelName(v.GetName())] = clientmodel.LabelValue(v.GetValue())
}
// TODO(matt): Update signature to work with ok.
m, _, err = l.fingerprintToMetrics.Lookup(f)
return m, nil
}
@ -855,14 +768,14 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
func (l *LevelDBMetricPersistence) CompactKeyspaces() {
l.CurationRemarks.CompactKeyspace()
l.fingerprintToMetrics.CompactKeyspace()
l.labelNameToFingerprints.CompactKeyspace()
l.labelSetToFingerprints.CompactKeyspace()
l.MetricHighWatermarks.CompactKeyspace()
l.metricMembershipIndex.CompactKeyspace()
l.MetricSamples.CompactKeyspace()
func (l *LevelDBMetricPersistence) Prune() {
l.CurationRemarks.Prune()
l.fingerprintToMetrics.Prune()
l.labelNameToFingerprints.Prune()
l.labelSetToFingerprints.Prune()
l.MetricHighWatermarks.Prune()
l.metricMembershipIndex.Prune()
l.MetricSamples.Prune()
}
func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) {
@ -873,27 +786,27 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
}
total += size
if size, err = l.fingerprintToMetrics.ApproximateSize(); err != nil {
if size, _, err = l.fingerprintToMetrics.Size(); err != nil {
return 0, err
}
total += size
if size, err = l.labelNameToFingerprints.ApproximateSize(); err != nil {
if size, _, err = l.labelNameToFingerprints.Size(); err != nil {
return 0, err
}
total += size
if size, err = l.labelSetToFingerprints.ApproximateSize(); err != nil {
if size, _, err = l.labelSetToFingerprints.Size(); err != nil {
return 0, err
}
total += size
if size, err = l.MetricHighWatermarks.ApproximateSize(); err != nil {
if size, _, err = l.MetricHighWatermarks.Size(); err != nil {
return 0, err
}
total += size
if size, err = l.metricMembershipIndex.ApproximateSize(); err != nil {
if size, _, err = l.metricMembershipIndex.Size(); err != nil {
return 0, err
}
total += size
@ -906,43 +819,14 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
return total, nil
}
func (l *LevelDBMetricPersistence) States() []leveldb.DatabaseState {
states := []leveldb.DatabaseState{}
state := l.CurationRemarks.State()
state.Name = "Curation Remarks"
state.Type = "Watermark"
states = append(states, state)
state = l.fingerprintToMetrics.State()
state.Name = "Fingerprints to Metrics"
state.Type = "Index"
states = append(states, state)
state = l.labelNameToFingerprints.State()
state.Name = "Label Name to Fingerprints"
state.Type = "Inverted Index"
states = append(states, state)
state = l.labelSetToFingerprints.State()
state.Name = "Label Pair to Fingerprints"
state.Type = "Inverted Index"
states = append(states, state)
state = l.MetricHighWatermarks.State()
state.Name = "Metric Last Write"
state.Type = "Watermark"
states = append(states, state)
state = l.metricMembershipIndex.State()
state.Name = "Metric Membership"
state.Type = "Index"
states = append(states, state)
state = l.MetricSamples.State()
state.Name = "Samples"
state.Type = "Time Series"
states = append(states, state)
return states
func (l *LevelDBMetricPersistence) States() raw.DatabaseStates {
return raw.DatabaseStates{
l.CurationRemarks.State(),
l.fingerprintToMetrics.State(),
l.labelNameToFingerprints.State(),
l.labelSetToFingerprints.State(),
l.MetricHighWatermarks.State(),
l.metricMembershipIndex.State(),
l.MetricSamples.State(),
}
}

View file

@ -1820,7 +1820,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) {
t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual)
}
for j, out := range scenario.out {
if out != actual[j] {
if !out.Equal(actual[j]) {
t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual)
}
}

View file

@ -848,19 +848,26 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: curatorDirectory.Path(),
})
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(),
},
})
if err != nil {
t.Fatal(err)
}
defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: sampleDirectory.Path(),
})
if err != nil {
t.Fatal(err)
}
@ -1367,19 +1374,24 @@ func TestCuratorDeletionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: curatorDirectory.Path()})
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(),
},
})
if err != nil {
t.Fatal(err)
}
defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{Path: sampleDirectory.Path()})
if err != nil {
t.Fatal(err)
}

View file

@ -331,7 +331,7 @@ func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (b
value := &dto.MetricHighWatermark{}
k := &dto.Fingerprint{}
dumpFingerprint(k, f)
diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(k, value)
_, diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f)
if err != nil {
return false, err
}

View file

@ -15,6 +15,7 @@ package metric
import (
"container/list"
"io"
"sync"
"time"
@ -23,6 +24,10 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
)
// unsafe.Sizeof(watermarks{})
@ -162,3 +167,104 @@ func (lru *WatermarkCache) checkCapacity() {
lru.size -= elementSize
}
}
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time
type HighWatermarker interface {
io.Closer
raw.ForEacher
raw.Pruner
UpdateBatch(FingerprintHighWatermarkMapping) error
Get(*clientmodel.Fingerprint) (t time.Time, ok bool, err error)
State() *raw.DatabaseState
Size() (uint64, bool, error)
}
type LeveldbHighWatermarker struct {
p *leveldb.LevelDBPersistence
}
func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) {
k := new(dto.Fingerprint)
dumpFingerprint(k, f)
v := new(dto.MetricHighWatermark)
ok, err = w.p.Get(k, v)
if err != nil {
return t, ok, err
}
if !ok {
return t, ok, err
}
t = time.Unix(v.GetTimestamp(), 0)
return t, true, nil
}
func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for fp, t := range m {
existing, present, err := w.Get(&fp)
if err != nil {
return err
}
k := new(dto.Fingerprint)
dumpFingerprint(k, &fp)
v := new(dto.MetricHighWatermark)
if !present {
v.Timestamp = proto.Int64(t.Unix())
batch.Put(k, v)
continue
}
// BUG(matt): Replace this with watermark management.
if t.After(existing) {
v.Timestamp = proto.Int64(t.Unix())
batch.Put(k, v)
}
}
return w.p.Commit(batch)
}
func (i *LeveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *LeveldbHighWatermarker) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbHighWatermarker) Close() error {
i.p.Close()
return nil
}
func (i *LeveldbHighWatermarker) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LeveldbHighWatermarker) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
type LevelDBHighWatermarkerOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (HighWatermarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbHighWatermarker{
p: s,
}, nil
}

View file

@ -22,7 +22,7 @@ import (
"github.com/prometheus/prometheus/storage/raw/leveldb"
)
var existenceValue = &dto.MembershipIndexValue{}
var existenceValue = new(dto.MembershipIndexValue)
type LevelDBMembershipIndex struct {
persistence *leveldb.LevelDBPersistence
@ -44,18 +44,19 @@ func (l *LevelDBMembershipIndex) Put(k proto.Message) error {
return l.persistence.Put(k, existenceValue)
}
func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) {
type LevelDBIndexOptions struct {
leveldb.LevelDBOptions
}
leveldbPersistence, err := leveldb.NewLevelDBPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded)
func NewLevelDBMembershipIndex(o *LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) {
leveldbPersistence, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return
return nil, err
}
i = &LevelDBMembershipIndex{
return &LevelDBMembershipIndex{
persistence: leveldbPersistence,
}
return
}, nil
}
func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error {
@ -66,14 +67,14 @@ func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error {
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
func (l *LevelDBMembershipIndex) CompactKeyspace() {
l.persistence.CompactKeyspace()
func (l *LevelDBMembershipIndex) Prune() {
l.persistence.Prune()
}
func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) {
return l.persistence.ApproximateSize()
}
func (l *LevelDBMembershipIndex) State() leveldb.DatabaseState {
func (l *LevelDBMembershipIndex) State() *raw.DatabaseState {
return l.persistence.State()
}

View file

@ -19,9 +19,23 @@ import (
"github.com/prometheus/prometheus/storage"
)
type ForEacher interface {
// ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met:
//
// 1.) A system anomaly in the database scan.
// 2.) The last record in the database is reached.
// 3.) A FilterResult of STOP is emitted by the Filter.
//
// Decoding errors for an entity cause that entity to be skipped.
ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
}
// Persistence models a key-value store for bytes that supports various
// additional operations.
type Persistence interface {
ForEacher
// Close reaps all of the underlying system resources associated with this
// persistence.
Close()
@ -34,15 +48,6 @@ type Persistence interface {
Drop(key proto.Message) error
// Put sets the key to a given value.
Put(key, value proto.Message) error
// ForEach is responsible for iterating through all records in the database
// until one of the following conditions are met:
//
// 1.) A system anomaly in the database scan.
// 2.) The last record in the database is reached.
// 3.) A FilterResult of STOP is emitted by the Filter.
//
// Decoding errors for an entity cause that entity to be skipped.
ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error)
// Commit applies the Batch operations to the database.
Commit(Batch) error
}
@ -59,3 +64,7 @@ type Batch interface {
// Drop follows the same protocol as Persistence.Drop.
Drop(key proto.Message)
}
type Pruner interface {
Prune() (noop bool, err error)
}

View file

@ -14,7 +14,6 @@
package leveldb
import (
"flag"
"fmt"
"time"
@ -26,16 +25,11 @@ import (
"github.com/prometheus/prometheus/storage/raw"
)
var (
leveldbFlushOnMutate = flag.Bool("leveldbFlushOnMutate", false, "Whether LevelDB should flush every operation to disk upon mutation before returning (bool).")
leveldbUseSnappy = flag.Bool("leveldbUseSnappy", true, "Whether LevelDB attempts to use Snappy for compressing elements (bool).")
leveldbUseParanoidChecks = flag.Bool("leveldbUseParanoidChecks", false, "Whether LevelDB uses expensive checks (bool).")
maximumOpenFiles = flag.Int("leveldb.maximumOpenFiles", 128, "The maximum number of files each LevelDB may maintain.")
)
// LevelDBPersistence is a disk-backed sorted key-value store.
type LevelDBPersistence struct {
path string
name string
purpose string
cache *levigo.Cache
filterPolicy *levigo.FilterPolicy
@ -169,25 +163,47 @@ func (i levigoIterator) GetError() (err error) {
return i.iterator.GetError()
}
func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevelDBPersistence, error) {
type Compression uint
const (
Snappy Compression = iota
Uncompressed
)
type LevelDBOptions struct {
Path string
Name string
Purpose string
CacheSizeBytes int
OpenFileAllowance int
FlushOnMutate bool
UseParanoidChecks bool
Compression Compression
}
func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) {
options := levigo.NewOptions()
options.SetCreateIfMissing(true)
options.SetParanoidChecks(*leveldbUseParanoidChecks)
compression := levigo.NoCompression
if *leveldbUseSnappy {
compression = levigo.SnappyCompression
options.SetParanoidChecks(o.UseParanoidChecks)
compression := levigo.SnappyCompression
if o.Compression == Uncompressed {
compression = levigo.NoCompression
}
options.SetCompression(compression)
cache := levigo.NewLRUCache(cacheCapacity)
cache := levigo.NewLRUCache(o.CacheSizeBytes)
options.SetCache(cache)
filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded)
filterPolicy := levigo.NewBloomFilter(10)
options.SetFilterPolicy(filterPolicy)
options.SetMaxOpenFiles(*maximumOpenFiles)
options.SetMaxOpenFiles(o.OpenFileAllowance)
storage, err := levigo.Open(storageRoot, options)
storage, err := levigo.Open(o.Path, options)
if err != nil {
return nil, err
}
@ -195,10 +211,12 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
readOptions := levigo.NewReadOptions()
writeOptions := levigo.NewWriteOptions()
writeOptions.SetSync(*leveldbFlushOnMutate)
writeOptions.SetSync(o.FlushOnMutate)
return &LevelDBPersistence{
path: storageRoot,
path: o.Path,
name: o.Name,
purpose: o.Purpose,
cache: cache,
filterPolicy: filterPolicy,
@ -303,7 +321,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
func (l *LevelDBPersistence) CompactKeyspace() {
func (l *LevelDBPersistence) Prune() {
// Magic values per https://code.google.com/p/leveldb/source/browse/include/leveldb/db.h#131.
keyspace := levigo.Range{

View file

@ -14,8 +14,8 @@
package leveldb
import (
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/utility"
"time"
)
const (
@ -23,32 +23,22 @@ const (
sstablesKey = "leveldb.sstables"
)
// DatabaseState models a bundle of metadata about a LevelDB database used in
// template format string interpolation.
type DatabaseState struct {
LastRefreshed time.Time
Type string
Name string
Path string
LowLevelStatus string
SSTablesStatus string
ApproximateSize utility.ByteSize
Error error
}
func (l *LevelDBPersistence) State() DatabaseState {
databaseState := DatabaseState{
LastRefreshed: time.Now(),
Path: l.path,
LowLevelStatus: l.storage.PropertyValue(statsKey),
SSTablesStatus: l.storage.PropertyValue(sstablesKey),
func (l *LevelDBPersistence) State() *raw.DatabaseState {
databaseState := &raw.DatabaseState{
Location: l.path,
Name: l.name,
Purpose: l.purpose,
Supplemental: map[string]string{},
}
if size, err := l.ApproximateSize(); err != nil {
databaseState.Error = err
databaseState.Supplemental["Errors"] = err.Error()
} else {
databaseState.ApproximateSize = utility.ByteSize(size)
databaseState.Size = utility.ByteSize(size)
}
databaseState.Supplemental["Low Level"] = l.storage.PropertyValue(statsKey)
databaseState.Supplemental["SSTable"] = l.storage.PropertyValue(sstablesKey)
return databaseState
}

View file

@ -20,10 +20,7 @@ import (
"github.com/prometheus/prometheus/utility/test"
)
const (
cacheCapacity = 0
bitsPerBloomFilterEncoded = 0
)
const cacheCapacity = 0
type (
// Pair models a prospective (key, value) double that will be committed to
@ -64,7 +61,11 @@ type (
func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) {
t = test.NewTemporaryDirectory(n, p.tester)
persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded)
o := &leveldb.LevelDBOptions{
Path: t.Path(),
CacheSizeBytes: cacheCapacity,
}
persistence, err := leveldb.NewLevelDBPersistence(o)
if err != nil {
defer t.Close()
p.tester.Fatal(err)

53
storage/raw/state.go Normal file
View file

@ -0,0 +1,53 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raw
import (
"github.com/prometheus/prometheus/utility"
)
type DatabaseState struct {
Name string
Size utility.ByteSize
Location string
Purpose string
Supplemental map[string]string
}
type DatabaseStates []*DatabaseState
func (s DatabaseStates) Len() int {
return len(s)
}
func (s DatabaseStates) Less(i, j int) bool {
l := s[i]
r := s[j]
if l.Name > r.Name {
return false
}
if l.Name < r.Name {
return true
}
return l.Size < r.Size
}
func (s DatabaseStates) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

View file

@ -44,7 +44,7 @@ func main() {
log.Printf("Starting compaction...")
size, _ := persistences.ApproximateSizes()
log.Printf("Original Size: %d", size)
persistences.CompactKeyspaces()
persistences.Prune()
log.Printf("Finished in %s", time.Since(start))
size, _ = persistences.ApproximateSizes()
log.Printf("New Size: %d", size)

View file

@ -14,29 +14,47 @@
package web
import (
"github.com/prometheus/prometheus/storage/raw/leveldb"
"net/http"
"sync"
"time"
"github.com/prometheus/prometheus/storage/raw"
)
type DatabasesHandler struct {
States []leveldb.DatabaseState
type DatabaseStatesProvider interface {
States() raw.DatabaseStates
}
Incoming chan []leveldb.DatabaseState
type DatabasesHandler struct {
RefreshInterval time.Duration
NextRefresh time.Time
Current raw.DatabaseStates
Provider DatabaseStatesProvider
mutex sync.RWMutex
}
func (h *DatabasesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
select {
case states := <-h.Incoming:
h.mutex.Lock()
defer h.mutex.Unlock()
h.States = states
default:
h.Refresh()
h.mutex.RLock()
defer h.mutex.RUnlock()
}
executeTemplate(w, "databases", h)
}
func (h *DatabasesHandler) Refresh() {
h.mutex.RLock()
if !time.Now().After(h.NextRefresh) {
h.mutex.RUnlock()
return
}
h.mutex.RUnlock()
h.mutex.Lock()
defer h.mutex.Unlock()
h.Current = h.Provider.States()
h.NextRefresh = time.Now().Add(h.RefreshInterval)
}

View file

@ -3,34 +3,28 @@
{{define "content"}}
<div class="container-fluid">
<h2>Database Information</h2>
{{range .States}}
<h3>{{.Name}}</h3>
{{range $database := .Current }}
<h3>{{$database.Name}}</h3>
<table class="table table-bordered table-condensed table-hover literal_output">
<tbody>
<tr>
<th>Path</th>
<td>{{.Path}}</td>
<th>Location</th>
<td>{{$database.Location}}</td>
</tr>
<tr>
<th>Last Refreshed</th>
<td>{{.LastRefreshed}}</td>
<th>Purpose</th>
<td>{{$database.Purpose}}</td>
</tr>
<tr>
<th>Type</th>
<td>{{.Type}}</td>
<th>Size</th>
<td>{{$database.Size}}</td>
</tr>
{{range $subject, $state := $database.Supplemental }}
<tr>
<th>Approximate Size</th>
<td>{{.ApproximateSize}}</td>
</tr>
<tr>
<th>Low Level Status</th>
<td><pre>{{.LowLevelStatus}}</pre></td>
</tr>
<tr>
<th>SSTable Status</th>
<td><pre>{{.SSTablesStatus}}</pre></td>
<th>{{$subject}}</th>
<td><pre>{{$state}}</pre></td>
</tr>
{{end}}
</tbody>
</table>
{{end}}