diff --git a/Makefile b/Makefile index 32cbbb561..efab12315 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ include Makefile.INCLUDE all: binary test -$(GOCC): $(BUILD_PATH)/cache/$(GOPKG) source_path +$(GOCC): $(BUILD_PATH)/cache/$(GOPKG) $(FULL_GOPATH) tar -C $(BUILD_PATH)/root -xzf $< touch $@ @@ -58,7 +58,7 @@ format: model: dependencies preparation $(MAKE) -C model -preparation: $(GOCC) source_path +preparation: $(GOCC) $(FULL_GOPATH) $(MAKE) -C $(BUILD_PATH) race_condition_binary: build @@ -76,15 +76,14 @@ search_index: server: config dependencies model preparation $(MAKE) -C server -# source_path is responsible for ensuring that the builder has not done anything +# $(FULL_GOPATH) is responsible for ensuring that the builder has not done anything # stupid like working on Prometheus outside of ${GOPATH}. -source_path: +$(FULL_GOPATH): -[ -d "$(FULL_GOPATH)" ] || { mkdir -vp $(FULL_GOPATH_BASE) ; ln -s "$(PWD)" "$(FULL_GOPATH)" ; } [ -d "$(FULL_GOPATH)" ] test: build - $(GOENV) find . -maxdepth 1 -mindepth 1 -type d -and -not -path $(BUILD_PATH) -exec $(GOCC) test {}/... $(GO_TEST_FLAGS) \; - $(GO) test $(GO_TEST_FLAGS) + $(GO) test $(GO_TEST_FLAGS) ./... tools: dependencies preparation $(MAKE) -C tools @@ -95,4 +94,4 @@ update: web: config dependencies model preparation $(MAKE) -C web -.PHONY: advice binary build clean config dependencies documentation format model package preparation race_condition_binary race_condition_run run search_index source_path test tools update +.PHONY: advice binary build clean config dependencies documentation format model package preparation race_condition_binary race_condition_run run search_index test tools update diff --git a/main.go b/main.go index 8ab97759d..022b886ab 100644 --- a/main.go +++ b/main.go @@ -28,7 +28,7 @@ import ( "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage/metric" - "github.com/prometheus/prometheus/storage/raw/leveldb" + "github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web/api" ) @@ -69,14 +69,13 @@ var ( ) type prometheus struct { - headCompactionTimer *time.Ticker - bodyCompactionTimer *time.Ticker - tailCompactionTimer *time.Ticker - deletionTimer *time.Ticker - reportDatabasesTimer *time.Ticker + headCompactionTimer *time.Ticker + bodyCompactionTimer *time.Ticker + tailCompactionTimer *time.Ticker + deletionTimer *time.Ticker + curationMutex sync.Mutex curationState chan metric.CurationState - databaseStates chan []leveldb.DatabaseState stopBackgroundOperations chan bool unwrittenSamples chan *extraction.Result @@ -142,10 +141,6 @@ func (p *prometheus) close() { p.deletionTimer.Stop() } - if p.reportDatabasesTimer != nil { - p.reportDatabasesTimer.Stop() - } - if len(p.stopBackgroundOperations) == 0 { p.stopBackgroundOperations <- true } @@ -158,26 +153,6 @@ func (p *prometheus) close() { close(p.notifications) close(p.stopBackgroundOperations) close(p.curationState) - close(p.databaseStates) -} - -func (p *prometheus) reportDatabaseState() { - for _ = range p.reportDatabasesTimer.C { - // BUG(matt): Per Julius, ... - // These channel magic tricks confuse me and seem a bit awkward just to - // pass a status around. Now that we have Go 1.1, would it be maybe be - // nicer to pass ts.DiskStorage.States as a method value - // (http://tip.golang.org/ref/spec#Method_values) to the web layer - // instead of doing this? - select { - case <-p.databaseStates: - // Reset the future database state if nobody consumes it. - case p.databaseStates <- p.storage.DiskStorage.States(): - // Set the database state so someone can consume it if they want. - default: - // Don't block. - } - } } func main() { @@ -207,7 +182,6 @@ func main() { unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity) curationState := make(chan metric.CurationState, 1) - databaseStates := make(chan []leveldb.DatabaseState, 1) // Coprime numbers, fool! headCompactionTimer := time.NewTicker(*headCompactInterval) bodyCompactionTimer := time.NewTicker(*bodyCompactInterval) @@ -255,7 +229,7 @@ func main() { } databasesHandler := &web.DatabasesHandler{ - Incoming: databaseStates, + Provider: ts.DiskStorage, } metricsService := &api.MetricsService{ @@ -278,10 +252,7 @@ func main() { deletionTimer: deletionTimer, - reportDatabasesTimer: time.NewTicker(15 * time.Minute), - - curationState: curationState, - databaseStates: databaseStates, + curationState: curationState, unwrittenSamples: unwrittenSamples, @@ -298,7 +269,6 @@ func main() { <-storageStarted go prometheus.interruptHandler() - go prometheus.reportDatabaseState() go func() { for _ = range prometheus.headCompactionTimer.C { diff --git a/storage/metric/curator.go b/storage/metric/curator.go index a37201971..564cd89d4 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -92,7 +92,7 @@ type watermarkScanner struct { // curated. // curationState is the on-disk store where the curation remarks are made for // how much progress has been made. -func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { +func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) { defer func(t time.Time) { duration := float64(time.Since(t) / time.Millisecond) diff --git a/storage/metric/index.go b/storage/metric/index.go new file mode 100644 index 000000000..9ed8b8412 --- /dev/null +++ b/storage/metric/index.go @@ -0,0 +1,406 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "io" + "sort" + + "code.google.com/p/goprotobuf/proto" + + clientmodel "github.com/prometheus/client_golang/model" + + dto "github.com/prometheus/prometheus/model/generated" + + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw" + "github.com/prometheus/prometheus/storage/raw/leveldb" +) + +type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric + +type FingerprintMetricIndex interface { + io.Closer + raw.Pruner + + IndexBatch(FingerprintMetricMapping) error + Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) + State() *raw.DatabaseState + Size() (s uint64, present bool, err error) +} + +type LeveldbFingerprintMetricIndex struct { + p *leveldb.LevelDBPersistence +} + +type LevelDBFingerprintMetricIndexOptions struct { + leveldb.LevelDBOptions +} + +func (i *LeveldbFingerprintMetricIndex) Close() error { + i.p.Close() + + return nil +} + +func (i *LeveldbFingerprintMetricIndex) State() *raw.DatabaseState { + return i.p.State() +} + +func (i *LeveldbFingerprintMetricIndex) Size() (uint64, bool, error) { + s, err := i.p.ApproximateSize() + return s, true, err +} + +func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { + b := leveldb.NewBatch() + defer b.Close() + + for f, m := range mapping { + k := new(dto.Fingerprint) + dumpFingerprint(k, &f) + v := new(dto.Metric) + dumpMetric(v, m) + + b.Put(k, v) + } + + return i.p.Commit(b) +} + +func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) { + k := new(dto.Fingerprint) + dumpFingerprint(k, f) + v := new(dto.Metric) + if ok, err := i.p.Get(k, v); !ok { + return nil, false, nil + } else if err != nil { + return nil, false, err + } + + m = clientmodel.Metric{} + + for _, pair := range v.LabelPair { + m[clientmodel.LabelName(pair.GetName())] = clientmodel.LabelValue(pair.GetValue()) + } + + return m, true, nil +} + +func (i *LeveldbFingerprintMetricIndex) Prune() (bool, error) { + i.p.Prune() + + return false, nil +} + +func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) { + s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) + if err != nil { + return nil, err + } + + return &LeveldbFingerprintMetricIndex{ + p: s, + }, nil +} + +type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints + +type LabelNameFingerprintIndex interface { + io.Closer + raw.Pruner + + IndexBatch(LabelNameFingerprintMapping) error + Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) + Has(clientmodel.LabelName) (ok bool, err error) + State() *raw.DatabaseState + Size() (s uint64, present bool, err error) +} + +type LeveldbLabelNameFingerprintIndex struct { + p *leveldb.LevelDBPersistence +} + +func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error { + batch := leveldb.NewBatch() + defer batch.Close() + + for labelName, fingerprints := range b { + sort.Sort(fingerprints) + + key := &dto.LabelName{ + Name: proto.String(string(labelName)), + } + value := new(dto.FingerprintCollection) + for _, fingerprint := range fingerprints { + f := new(dto.Fingerprint) + dumpFingerprint(f, fingerprint) + value.Member = append(value.Member, f) + } + + batch.Put(key, value) + } + + return i.p.Commit(batch) +} + +func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) { + k := new(dto.LabelName) + dumpLabelName(k, l) + v := new(dto.FingerprintCollection) + ok, err = i.p.Get(k, v) + if err != nil { + return nil, false, err + } + if !ok { + return nil, false, nil + } + + for _, m := range v.Member { + fp := new(clientmodel.Fingerprint) + loadFingerprint(fp, m) + fps = append(fps, fp) + } + + return fps, true, nil +} + +func (i *LeveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) { + return i.p.Has(&dto.LabelName{ + Name: proto.String(string(l)), + }) +} + +func (i *LeveldbLabelNameFingerprintIndex) Prune() (bool, error) { + i.p.Prune() + + return false, nil +} + +func (i *LeveldbLabelNameFingerprintIndex) Close() error { + i.p.Close() + + return nil +} + +func (i *LeveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) { + s, err := i.p.ApproximateSize() + return s, true, err +} + +func (i *LeveldbLabelNameFingerprintIndex) State() *raw.DatabaseState { + return i.p.State() +} + +type LevelDBLabelNameFingerprintIndexOptions struct { + leveldb.LevelDBOptions +} + +func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (LabelNameFingerprintIndex, error) { + s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) + if err != nil { + return nil, err + } + + return &LeveldbLabelNameFingerprintIndex{ + p: s, + }, nil +} + +type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints + +type LabelSetFingerprintIndex interface { + io.Closer + raw.ForEacher + raw.Pruner + + IndexBatch(LabelSetFingerprintMapping) error + Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error) + Has(*LabelPair) (ok bool, err error) + State() *raw.DatabaseState + Size() (s uint64, present bool, err error) +} + +type LeveldbLabelSetFingerprintIndex struct { + p *leveldb.LevelDBPersistence +} + +type LevelDBLabelSetFingerprintIndexOptions struct { + leveldb.LevelDBOptions +} + +func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error { + batch := leveldb.NewBatch() + defer batch.Close() + + for pair, fps := range m { + sort.Sort(fps) + + key := &dto.LabelPair{ + Name: proto.String(string(pair.Name)), + Value: proto.String(string(pair.Value)), + } + value := new(dto.FingerprintCollection) + for _, fp := range fps { + f := new(dto.Fingerprint) + dumpFingerprint(f, fp) + value.Member = append(value.Member, f) + } + + batch.Put(key, value) + } + + return i.p.Commit(batch) +} + +func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) { + k := &dto.LabelPair{ + Name: proto.String(string(p.Name)), + Value: proto.String(string(p.Value)), + } + v := new(dto.FingerprintCollection) + + ok, err = i.p.Get(k, v) + + if !ok { + return nil, false, nil + } + if err != nil { + return nil, false, err + } + + for _, pair := range v.Member { + fp := new(clientmodel.Fingerprint) + loadFingerprint(fp, pair) + m = append(m, fp) + } + + return m, true, nil +} + +func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) { + k := &dto.LabelPair{ + Name: proto.String(string(p.Name)), + Value: proto.String(string(p.Value)), + } + + return i.p.Has(k) +} + +func (i *LeveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { + return i.p.ForEach(d, f, o) +} + +func (i *LeveldbLabelSetFingerprintIndex) Prune() (bool, error) { + i.p.Prune() + return false, nil +} + +func (i *LeveldbLabelSetFingerprintIndex) Close() error { + i.p.Close() + + return nil +} + +func (i *LeveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) { + s, err := i.p.ApproximateSize() + return s, true, err +} + +func (i *LeveldbLabelSetFingerprintIndex) State() *raw.DatabaseState { + return i.p.State() +} + +func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (LabelSetFingerprintIndex, error) { + s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) + if err != nil { + return nil, err + } + + return &LeveldbLabelSetFingerprintIndex{ + p: s, + }, nil +} + +type MetricMembershipIndex interface { + io.Closer + raw.Pruner + + IndexBatch([]clientmodel.Metric) error + Has(clientmodel.Metric) (ok bool, err error) + State() *raw.DatabaseState + Size() (s uint64, present bool, err error) +} + +type LeveldbMetricMembershipIndex struct { + p *leveldb.LevelDBPersistence +} + +var existenceIdentity = new(dto.MembershipIndexValue) + +func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error { + batch := leveldb.NewBatch() + defer batch.Close() + + for _, m := range ms { + k := new(dto.Metric) + dumpMetric(k, m) + batch.Put(k, existenceIdentity) + } + + return i.p.Commit(batch) +} + +func (i *LeveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) { + k := new(dto.Metric) + dumpMetric(k, m) + + return i.p.Has(k) +} + +func (i *LeveldbMetricMembershipIndex) Close() error { + i.p.Close() + + return nil +} + +func (i *LeveldbMetricMembershipIndex) Size() (uint64, bool, error) { + s, err := i.p.ApproximateSize() + return s, true, err +} + +func (i *LeveldbMetricMembershipIndex) State() *raw.DatabaseState { + return i.p.State() +} + +func (i *LeveldbMetricMembershipIndex) Prune() (bool, error) { + i.p.Prune() + + return false, nil +} + +type LevelDBMetricMembershipIndexOptions struct { + leveldb.LevelDBOptions +} + +func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (MetricMembershipIndex, error) { + s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) + if err != nil { + return nil, err + } + + return &LeveldbMetricMembershipIndex{ + p: s, + }, nil +} diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index d4474f0a6..6b251ba3b 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -26,9 +26,9 @@ import ( clientmodel "github.com/prometheus/client_golang/model" dto "github.com/prometheus/prometheus/model/generated" - index "github.com/prometheus/prometheus/storage/raw/index/leveldb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/utility" ) @@ -37,11 +37,11 @@ const sortConcurrency = 2 type LevelDBMetricPersistence struct { CurationRemarks *leveldb.LevelDBPersistence - fingerprintToMetrics *leveldb.LevelDBPersistence - labelNameToFingerprints *leveldb.LevelDBPersistence - labelSetToFingerprints *leveldb.LevelDBPersistence - MetricHighWatermarks *leveldb.LevelDBPersistence - metricMembershipIndex *index.LevelDBMembershipIndex + fingerprintToMetrics FingerprintMetricIndex + labelNameToFingerprints LabelNameFingerprintIndex + labelSetToFingerprints LabelSetFingerprintIndex + MetricHighWatermarks HighWatermarker + metricMembershipIndex MetricMembershipIndex MetricSamples *leveldb.LevelDBPersistence } @@ -60,12 +60,15 @@ var ( ) type leveldbOpener func() -type leveldbCloser interface { +type errorCloser interface { + Close() error +} +type closer interface { Close() } func (l *LevelDBMetricPersistence) Close() { - var persistences = []leveldbCloser{ + var persistences = []interface{}{ l.CurationRemarks, l.fingerprintToMetrics, l.labelNameToFingerprints, @@ -77,14 +80,21 @@ func (l *LevelDBMetricPersistence) Close() { closerGroup := sync.WaitGroup{} - for _, closer := range persistences { + for _, c := range persistences { closerGroup.Add(1) - go func(closer leveldbCloser) { - if closer != nil { - closer.Close() + go func(c interface{}) { + if c != nil { + switch closer := c.(type) { + case closer: + closer.Close() + case errorCloser: + if err := closer.Close(); err != nil { + log.Println("anomaly closing:", err) + } + } } closerGroup.Done() - }(closer) + }(c) } closerGroup.Wait() @@ -103,7 +113,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Label Names and Value Pairs by Fingerprint", func() { var err error - emission.fingerprintToMetrics, err = leveldb.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10) + emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(&LevelDBFingerprintMetricIndexOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Name: "Metrics by Fingerprint", + Purpose: "Index", + Path: baseDirectory + "/label_name_and_value_pairs_by_fingerprint", + CacheSizeBytes: *fingerprintsToLabelPairCacheSize, + }, + }) workers.MayFail(err) }, }, @@ -111,7 +128,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Samples by Fingerprint", func() { var err error - emission.MetricSamples, err = leveldb.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10) + o := &leveldb.LevelDBOptions{ + Name: "Samples", + Purpose: "Timeseries", + Path: baseDirectory + "/samples_by_fingerprint", + CacheSizeBytes: *fingerprintsToLabelPairCacheSize, + } + emission.MetricSamples, err = leveldb.NewLevelDBPersistence(o) workers.MayFail(err) }, }, @@ -119,7 +142,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "High Watermarks by Fingerprint", func() { var err error - emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/high_watermarks_by_fingerprint", *highWatermarkCacheSize, 10) + emission.MetricHighWatermarks, err = NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Name: "High Watermarks", + Purpose: "The youngest sample in the database per metric.", + Path: baseDirectory + "/high_watermarks_by_fingerprint", + CacheSizeBytes: *highWatermarkCacheSize, + }}) workers.MayFail(err) }, }, @@ -127,7 +156,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name", func() { var err error - emission.labelNameToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10) + emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(&LevelDBLabelNameFingerprintIndexOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Name: "Fingerprints by Label Name", + Purpose: "Index", + Path: baseDirectory + "/fingerprints_by_label_name", + CacheSizeBytes: *labelNameToFingerprintsCacheSize, + }, + }) workers.MayFail(err) }, }, @@ -135,7 +171,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Fingerprints by Label Name and Value Pair", func() { var err error - emission.labelSetToFingerprints, err = leveldb.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10) + emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(&LevelDBLabelSetFingerprintIndexOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Name: "Fingerprints by Label Pair", + Purpose: "Index", + Path: baseDirectory + "/fingerprints_by_label_name_and_value_pair", + CacheSizeBytes: *labelPairToFingerprintsCacheSize, + }, + }) workers.MayFail(err) }, }, @@ -143,7 +186,15 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Metric Membership Index", func() { var err error - emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10) + emission.metricMembershipIndex, err = NewLevelDBMetricMembershipIndex( + &LevelDBMetricMembershipIndexOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Name: "Metric Membership", + Purpose: "Index", + Path: baseDirectory + "/metric_membership_index", + CacheSizeBytes: *metricMembershipIndexCacheSize, + }, + }) workers.MayFail(err) }, }, @@ -151,7 +202,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc "Sample Curation Remarks", func() { var err error - emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(baseDirectory+"/curation_remarks", *curationRemarksCacheSize, 10) + o := &leveldb.LevelDBOptions{ + Name: "Sample Curation Remarks", + Purpose: "Ledger of Progress for Various Curators", + Path: baseDirectory + "/curation_remarks", + CacheSizeBytes: *curationRemarksCacheSize, + } + emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(o) workers.MayFail(err) }, }, @@ -222,19 +279,16 @@ func groupByFingerprint(samples clientmodel.Samples) map[clientmodel.Fingerprint // findUnindexedMetrics scours the metric membership index for each given Metric // in the keyspace and returns a map of Fingerprint-Metric pairs that are // absent. -func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed map[clientmodel.Fingerprint]clientmodel.Metric, err error) { +func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmodel.Fingerprint]clientmodel.Metric) (unindexed FingerprintMetricMapping, err error) { defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: findUnindexedMetrics, result: success}, map[string]string{operation: findUnindexedMetrics, result: failure}) }(time.Now()) - unindexed = make(map[clientmodel.Fingerprint]clientmodel.Metric) - - dto := &dto.Metric{} + unindexed = FingerprintMetricMapping{} for fingerprint, metric := range candidates { - dumpMetric(dto, metric) - indexHas, err := l.hasIndexMetric(dto) + indexHas, err := l.hasIndexMetric(metric) if err != nil { return unindexed, err } @@ -251,67 +305,47 @@ func (l *LevelDBMetricPersistence) findUnindexedMetrics(candidates map[clientmod // the index to reflect the new state. // // This operation is idempotent. -func (l *LevelDBMetricPersistence) indexLabelNames(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) { +func (l *LevelDBMetricPersistence) indexLabelNames(metrics FingerprintMetricMapping) (err error) { defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: indexLabelNames, result: success}, map[string]string{operation: indexLabelNames, result: failure}) }(time.Now()) - labelNameFingerprints := map[clientmodel.LabelName]utility.Set{} + retrieved := map[clientmodel.LabelName]utility.Set{} for fingerprint, metric := range metrics { for labelName := range metric { - fingerprintSet, ok := labelNameFingerprints[labelName] + fingerprintSet, ok := retrieved[labelName] if !ok { - fingerprintSet = utility.Set{} - fingerprints, err := l.GetFingerprintsForLabelName(labelName) if err != nil { return err } + fingerprintSet = utility.Set{} + retrieved[labelName] = fingerprintSet + for _, fingerprint := range fingerprints { fingerprintSet.Add(*fingerprint) } } fingerprintSet.Add(fingerprint) - labelNameFingerprints[labelName] = fingerprintSet } } - batch := leveldb.NewBatch() - defer batch.Close() - - for labelName, fingerprintSet := range labelNameFingerprints { - fingerprints := clientmodel.Fingerprints{} - for e := range fingerprintSet { - fingerprint := e.(clientmodel.Fingerprint) - fingerprints = append(fingerprints, &fingerprint) + pending := LabelNameFingerprintMapping{} + for name, set := range retrieved { + fps := pending[name] + for fp := range set { + f := fp.(clientmodel.Fingerprint) + fps = append(fps, &f) } - - sort.Sort(fingerprints) - - key := &dto.LabelName{ - Name: proto.String(string(labelName)), - } - value := new(dto.FingerprintCollection) - for _, fingerprint := range fingerprints { - f := new(dto.Fingerprint) - dumpFingerprint(f, fingerprint) - value.Member = append(value.Member, f) - } - - batch.Put(key, value) + pending[name] = fps } - err = l.labelNameToFingerprints.Commit(batch) - if err != nil { - return - } - - return + return l.labelNameToFingerprints.IndexBatch(pending) } // indexLabelPairs accumulates all label pair to fingerprint index entries for @@ -326,7 +360,7 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Finge recordOutcome(duration, err, map[string]string{operation: indexLabelPairs, result: success}, map[string]string{operation: indexLabelPairs, result: failure}) }(time.Now()) - labelPairFingerprints := map[LabelPair]utility.Set{} + collection := map[LabelPair]utility.Set{} for fingerprint, metric := range metrics { for labelName, labelValue := range metric { @@ -334,113 +368,69 @@ func (l *LevelDBMetricPersistence) indexLabelPairs(metrics map[clientmodel.Finge Name: labelName, Value: labelValue, } - fingerprintSet, ok := labelPairFingerprints[labelPair] + fingerprintSet, ok := collection[labelPair] if !ok { - fingerprintSet = utility.Set{} - - fingerprints, err := l.GetFingerprintsForLabelSet(clientmodel.LabelSet{ - labelName: labelValue, - }) + fingerprints, _, err := l.labelSetToFingerprints.Lookup(&labelPair) if err != nil { return err } + fingerprintSet = utility.Set{} for _, fingerprint := range fingerprints { fingerprintSet.Add(*fingerprint) } + + collection[labelPair] = fingerprintSet } fingerprintSet.Add(fingerprint) - labelPairFingerprints[labelPair] = fingerprintSet } } - batch := leveldb.NewBatch() - defer batch.Close() + batch := LabelSetFingerprintMapping{} - for labelPair, fingerprintSet := range labelPairFingerprints { - fingerprints := clientmodel.Fingerprints{} - for e := range fingerprintSet { - fingerprint := e.(clientmodel.Fingerprint) - fingerprints = append(fingerprints, &fingerprint) + for pair, elements := range collection { + fps := batch[pair] + for element := range elements { + fp := element.(clientmodel.Fingerprint) + fps = append(fps, &fp) } - - sort.Sort(fingerprints) - - key := &dto.LabelPair{ - Name: proto.String(string(labelPair.Name)), - Value: proto.String(string(labelPair.Value)), - } - value := new(dto.FingerprintCollection) - for _, fingerprint := range fingerprints { - f := new(dto.Fingerprint) - dumpFingerprint(f, fingerprint) - value.Member = append(value.Member, f) - } - - batch.Put(key, value) + batch[pair] = fps } - err = l.labelSetToFingerprints.Commit(batch) - if err != nil { - return - } - - return + return l.labelSetToFingerprints.IndexBatch(batch) } // indexFingerprints updates all of the Fingerprint to Metric reverse lookups // in the index and then bulk updates. // // This operation is idempotent. -func (l *LevelDBMetricPersistence) indexFingerprints(metrics map[clientmodel.Fingerprint]clientmodel.Metric) (err error) { +func (l *LevelDBMetricPersistence) indexFingerprints(b FingerprintMetricMapping) (err error) { defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: indexFingerprints, result: success}, map[string]string{operation: indexFingerprints, result: failure}) }(time.Now()) - batch := leveldb.NewBatch() - defer batch.Close() - - for fingerprint, metric := range metrics { - f := new(dto.Fingerprint) - dumpFingerprint(f, &fingerprint) - m := &dto.Metric{} - dumpMetric(m, metric) - batch.Put(f, m) - } - - err = l.fingerprintToMetrics.Commit(batch) - if err != nil { - return - } - - return + return l.fingerprintToMetrics.IndexBatch(b) } -var existenceIdentity = &dto.MembershipIndexValue{} - // indexMetrics takes groups of samples, determines which ones contain metrics // that are unknown to the storage stack, and then proceeds to update all // affected indices. -func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[clientmodel.Fingerprint]clientmodel.Metric) (err error) { +func (l *LevelDBMetricPersistence) indexMetrics(fingerprints FingerprintMetricMapping) (err error) { defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: indexMetrics, result: success}, map[string]string{operation: indexMetrics, result: failure}) }(time.Now()) - var ( - absentMetrics map[clientmodel.Fingerprint]clientmodel.Metric - ) - - absentMetrics, err = l.findUnindexedMetrics(fingerprints) + absentees, err := l.findUnindexedMetrics(fingerprints) if err != nil { return } - if len(absentMetrics) == 0 { + if len(absentees) == 0 { return } @@ -449,42 +439,32 @@ func (l *LevelDBMetricPersistence) indexMetrics(fingerprints map[clientmodel.Fin workers := utility.NewUncertaintyGroup(3) go func() { - workers.MayFail(l.indexLabelNames(absentMetrics)) + workers.MayFail(l.indexLabelNames(absentees)) }() go func() { - workers.MayFail(l.indexLabelPairs(absentMetrics)) + workers.MayFail(l.indexLabelPairs(absentees)) }() go func() { - workers.MayFail(l.indexFingerprints(absentMetrics)) + workers.MayFail(l.indexFingerprints(absentees)) }() - if !workers.Wait() { - return fmt.Errorf("Could not index due to %s", workers.Errors()) - } - // If any of the preceding operations failed, we will have inconsistent // indices. Thusly, the Metric membership index should NOT be updated, as // its state is used to determine whether to bulk update the other indices. // Given that those operations are idempotent, it is OK to repeat them; // however, it will consume considerable amounts of time. - batch := leveldb.NewBatch() - defer batch.Close() - - for _, metric := range absentMetrics { - m := &dto.Metric{} - dumpMetric(m, metric) - batch.Put(m, existenceIdentity) + if !workers.Wait() { + return fmt.Errorf("Could not index due to %s", workers.Errors()) } - err = l.metricMembershipIndex.Commit(batch) - if err != nil { - // Not critical but undesirable. - log.Println(err) + ms := []clientmodel.Metric{} + for _, m := range absentees { + ms = append(ms, m) } - return + return l.metricMembershipIndex.IndexBatch(ms) } func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.Fingerprint]clientmodel.Samples) (err error) { @@ -494,41 +474,16 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel. recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure}) }(time.Now()) - batch := leveldb.NewBatch() - defer batch.Close() - - value := &dto.MetricHighWatermark{} - for fingerprint, samples := range groups { - value.Reset() - f := new(dto.Fingerprint) - dumpFingerprint(f, &fingerprint) - present, err := l.MetricHighWatermarks.Get(f, value) - if err != nil { - return err - } - - newestSampleTimestamp := samples[len(samples)-1].Timestamp - - if !present { - value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(f, value) - + b := FingerprintHighWatermarkMapping{} + for fp, ss := range groups { + if len(ss) == 0 { continue } - // BUG(matt): Repace this with watermark management. - if newestSampleTimestamp.After(time.Unix(value.GetTimestamp(), 0)) { - value.Timestamp = proto.Int64(newestSampleTimestamp.Unix()) - batch.Put(f, value) - } + b[fp] = ss[len(ss)-1].Timestamp } - err = l.MetricHighWatermarks.Commit(batch) - if err != nil { - return err - } - - return nil + return l.MetricHighWatermarks.UpdateBatch(b) } func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) { @@ -543,7 +498,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e watermarkErrChan := make(chan error, 1) go func(groups map[clientmodel.Fingerprint]clientmodel.Samples) { - metrics := map[clientmodel.Fingerprint]clientmodel.Metric{} + metrics := FingerprintMetricMapping{} for fingerprint, samples := range groups { metrics[fingerprint] = samples[0].Metric @@ -637,38 +592,34 @@ func extractSampleValues(i leveldb.Iterator) (Values, error) { return NewValuesFromDTO(v), nil } -func (l *LevelDBMetricPersistence) hasIndexMetric(dto *dto.Metric) (value bool, err error) { +func (l *LevelDBMetricPersistence) hasIndexMetric(m clientmodel.Metric) (value bool, err error) { defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasIndexMetric, result: success}, map[string]string{operation: hasIndexMetric, result: failure}) }(time.Now()) - value, err = l.metricMembershipIndex.Has(dto) - - return + return l.metricMembershipIndex.Has(m) } -func (l *LevelDBMetricPersistence) HasLabelPair(dto *dto.LabelPair) (value bool, err error) { +func (l *LevelDBMetricPersistence) HasLabelPair(p *LabelPair) (value bool, err error) { defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasLabelPair, result: success}, map[string]string{operation: hasLabelPair, result: failure}) }(time.Now()) - value, err = l.labelSetToFingerprints.Has(dto) - - return + return l.labelSetToFingerprints.Has(p) } -func (l *LevelDBMetricPersistence) HasLabelName(dto *dto.LabelName) (value bool, err error) { +func (l *LevelDBMetricPersistence) HasLabelName(n clientmodel.LabelName) (value bool, err error) { defer func(begin time.Time) { duration := time.Since(begin) recordOutcome(duration, err, map[string]string{operation: hasLabelName, result: success}, map[string]string{operation: hasLabelName, result: failure}) }(time.Now()) - value, err = l.labelNameToFingerprints.Has(dto) + value, err = l.labelNameToFingerprints.Has(n) return } @@ -681,29 +632,19 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelSet(labelSet clientmod }(time.Now()) sets := []utility.Set{} - pair := &dto.LabelPair{} - unmarshaled := new(dto.FingerprintCollection) for name, value := range labelSet { - pair.Reset() - unmarshaled.Reset() - - pair.Name = proto.String(string(name)) - pair.Value = proto.String(string(value)) - - present, err := l.labelSetToFingerprints.Get(pair, unmarshaled) + fps, _, err := l.labelSetToFingerprints.Lookup(&LabelPair{ + Name: name, + Value: value, + }) if err != nil { - return fps, err - } - if !present { - return nil, nil + return nil, err } set := utility.Set{} - for _, m := range unmarshaled.Member { - fp := &clientmodel.Fingerprint{} - loadFingerprint(fp, m) + for _, fp := range fps { set.Add(*fp) } @@ -734,24 +675,10 @@ func (l *LevelDBMetricPersistence) GetFingerprintsForLabelName(labelName clientm recordOutcome(duration, err, map[string]string{operation: getFingerprintsForLabelName, result: success}, map[string]string{operation: getFingerprintsForLabelName, result: failure}) }(time.Now()) - unmarshaled := new(dto.FingerprintCollection) - d := &dto.LabelName{} - dumpLabelName(d, labelName) - present, err := l.labelNameToFingerprints.Get(d, unmarshaled) - if err != nil { - return nil, err - } - if !present { - return nil, nil - } + // TODO(matt): Update signature to work with ok. + fps, _, err = l.labelNameToFingerprints.Lookup(labelName) - for _, m := range unmarshaled.Member { - fp := &clientmodel.Fingerprint{} - loadFingerprint(fp, m) - fps = append(fps, fp) - } - - return fps, nil + return fps, err } func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Fingerprint) (m clientmodel.Metric, err error) { @@ -761,22 +688,8 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f *clientmodel.Finger recordOutcome(duration, err, map[string]string{operation: getMetricForFingerprint, result: success}, map[string]string{operation: getMetricForFingerprint, result: failure}) }(time.Now()) - unmarshaled := &dto.Metric{} - d := new(dto.Fingerprint) - dumpFingerprint(d, f) - present, err := l.fingerprintToMetrics.Get(d, unmarshaled) - if err != nil { - return nil, err - } - if !present { - return nil, nil - } - - m = clientmodel.Metric{} - - for _, v := range unmarshaled.LabelPair { - m[clientmodel.LabelName(v.GetName())] = clientmodel.LabelValue(v.GetValue()) - } + // TODO(matt): Update signature to work with ok. + m, _, err = l.fingerprintToMetrics.Lookup(f) return m, nil } @@ -855,14 +768,14 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La // // Beware that it would probably be imprudent to run this on a live user-facing // server due to latency implications. -func (l *LevelDBMetricPersistence) CompactKeyspaces() { - l.CurationRemarks.CompactKeyspace() - l.fingerprintToMetrics.CompactKeyspace() - l.labelNameToFingerprints.CompactKeyspace() - l.labelSetToFingerprints.CompactKeyspace() - l.MetricHighWatermarks.CompactKeyspace() - l.metricMembershipIndex.CompactKeyspace() - l.MetricSamples.CompactKeyspace() +func (l *LevelDBMetricPersistence) Prune() { + l.CurationRemarks.Prune() + l.fingerprintToMetrics.Prune() + l.labelNameToFingerprints.Prune() + l.labelSetToFingerprints.Prune() + l.MetricHighWatermarks.Prune() + l.metricMembershipIndex.Prune() + l.MetricSamples.Prune() } func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) { @@ -873,27 +786,27 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) } total += size - if size, err = l.fingerprintToMetrics.ApproximateSize(); err != nil { + if size, _, err = l.fingerprintToMetrics.Size(); err != nil { return 0, err } total += size - if size, err = l.labelNameToFingerprints.ApproximateSize(); err != nil { + if size, _, err = l.labelNameToFingerprints.Size(); err != nil { return 0, err } total += size - if size, err = l.labelSetToFingerprints.ApproximateSize(); err != nil { + if size, _, err = l.labelSetToFingerprints.Size(); err != nil { return 0, err } total += size - if size, err = l.MetricHighWatermarks.ApproximateSize(); err != nil { + if size, _, err = l.MetricHighWatermarks.Size(); err != nil { return 0, err } total += size - if size, err = l.metricMembershipIndex.ApproximateSize(); err != nil { + if size, _, err = l.metricMembershipIndex.Size(); err != nil { return 0, err } total += size @@ -906,43 +819,14 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) return total, nil } -func (l *LevelDBMetricPersistence) States() []leveldb.DatabaseState { - states := []leveldb.DatabaseState{} - - state := l.CurationRemarks.State() - state.Name = "Curation Remarks" - state.Type = "Watermark" - states = append(states, state) - - state = l.fingerprintToMetrics.State() - state.Name = "Fingerprints to Metrics" - state.Type = "Index" - states = append(states, state) - - state = l.labelNameToFingerprints.State() - state.Name = "Label Name to Fingerprints" - state.Type = "Inverted Index" - states = append(states, state) - - state = l.labelSetToFingerprints.State() - state.Name = "Label Pair to Fingerprints" - state.Type = "Inverted Index" - states = append(states, state) - - state = l.MetricHighWatermarks.State() - state.Name = "Metric Last Write" - state.Type = "Watermark" - states = append(states, state) - - state = l.metricMembershipIndex.State() - state.Name = "Metric Membership" - state.Type = "Index" - states = append(states, state) - - state = l.MetricSamples.State() - state.Name = "Samples" - state.Type = "Time Series" - states = append(states, state) - - return states +func (l *LevelDBMetricPersistence) States() raw.DatabaseStates { + return raw.DatabaseStates{ + l.CurationRemarks.State(), + l.fingerprintToMetrics.State(), + l.labelNameToFingerprints.State(), + l.labelSetToFingerprints.State(), + l.MetricHighWatermarks.State(), + l.metricMembershipIndex.State(), + l.MetricSamples.State(), + } } diff --git a/storage/metric/operation_test.go b/storage/metric/operation_test.go index 6a44fd4ee..90978e65f 100644 --- a/storage/metric/operation_test.go +++ b/storage/metric/operation_test.go @@ -1820,7 +1820,7 @@ func TestGetValuesAlongRangeOp(t *testing.T) { t.Fatalf("%d. expected length %d, got %d: %v", i, len(scenario.out), len(actual), actual) } for j, out := range scenario.out { - if out != actual[j] { + if !out.Equal(actual[j]) { t.Fatalf("%d. expected output %v, got %v", i, scenario.out, actual) } } diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index 74bfb1e13..83ff8fc7e 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -848,19 +848,26 @@ func TestCuratorCompactionProcessor(t *testing.T) { sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ + Path: curatorDirectory.Path(), + }) if err != nil { t.Fatal(err) } - defer curatorStates.Close() - watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Path: watermarkDirectory.Path(), + }, + }) if err != nil { t.Fatal(err) } defer watermarkStates.Close() - samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) + samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ + Path: sampleDirectory.Path(), + }) if err != nil { t.Fatal(err) } @@ -1367,19 +1374,24 @@ func TestCuratorDeletionProcessor(t *testing.T) { sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) defer sampleDirectory.Close() - curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{ + Path: curatorDirectory.Path()}) if err != nil { t.Fatal(err) } defer curatorStates.Close() - watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{ + LevelDBOptions: leveldb.LevelDBOptions{ + Path: watermarkDirectory.Path(), + }, + }) if err != nil { t.Fatal(err) } defer watermarkStates.Close() - samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) + samples, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{Path: sampleDirectory.Path()}) if err != nil { t.Fatal(err) } diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index ce4f0c32f..d5870ae29 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -331,7 +331,7 @@ func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (b value := &dto.MetricHighWatermark{} k := &dto.Fingerprint{} dumpFingerprint(k, f) - diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(k, value) + _, diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f) if err != nil { return false, err } diff --git a/storage/metric/watermark.go b/storage/metric/watermark.go index 9522419e4..e6b0ab3e3 100644 --- a/storage/metric/watermark.go +++ b/storage/metric/watermark.go @@ -15,6 +15,7 @@ package metric import ( "container/list" + "io" "sync" "time" @@ -23,6 +24,10 @@ import ( clientmodel "github.com/prometheus/client_golang/model" dto "github.com/prometheus/prometheus/model/generated" + + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw" + "github.com/prometheus/prometheus/storage/raw/leveldb" ) // unsafe.Sizeof(watermarks{}) @@ -162,3 +167,104 @@ func (lru *WatermarkCache) checkCapacity() { lru.size -= elementSize } } + +type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time + +type HighWatermarker interface { + io.Closer + raw.ForEacher + raw.Pruner + + UpdateBatch(FingerprintHighWatermarkMapping) error + Get(*clientmodel.Fingerprint) (t time.Time, ok bool, err error) + State() *raw.DatabaseState + Size() (uint64, bool, error) +} + +type LeveldbHighWatermarker struct { + p *leveldb.LevelDBPersistence +} + +func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) { + k := new(dto.Fingerprint) + dumpFingerprint(k, f) + v := new(dto.MetricHighWatermark) + ok, err = w.p.Get(k, v) + if err != nil { + return t, ok, err + } + if !ok { + return t, ok, err + } + t = time.Unix(v.GetTimestamp(), 0) + return t, true, nil +} + +func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error { + batch := leveldb.NewBatch() + defer batch.Close() + + for fp, t := range m { + existing, present, err := w.Get(&fp) + if err != nil { + return err + } + k := new(dto.Fingerprint) + dumpFingerprint(k, &fp) + v := new(dto.MetricHighWatermark) + if !present { + v.Timestamp = proto.Int64(t.Unix()) + batch.Put(k, v) + + continue + } + + // BUG(matt): Replace this with watermark management. + if t.After(existing) { + v.Timestamp = proto.Int64(t.Unix()) + batch.Put(k, v) + } + } + + return w.p.Commit(batch) +} + +func (i *LeveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) { + return i.p.ForEach(d, f, o) +} + +func (i *LeveldbHighWatermarker) Prune() (bool, error) { + i.p.Prune() + + return false, nil +} + +func (i *LeveldbHighWatermarker) Close() error { + i.p.Close() + + return nil +} + +func (i *LeveldbHighWatermarker) State() *raw.DatabaseState { + return i.p.State() +} + +func (i *LeveldbHighWatermarker) Size() (uint64, bool, error) { + s, err := i.p.ApproximateSize() + return s, true, err +} + +type LevelDBHighWatermarkerOptions struct { + leveldb.LevelDBOptions +} + +func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (HighWatermarker, error) { + s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) + if err != nil { + return nil, err + } + + return &LeveldbHighWatermarker{ + p: s, + }, nil +} diff --git a/storage/raw/index/leveldb/leveldb.go b/storage/raw/index/leveldb/leveldb.go index 7eb55bae2..16a8edd22 100644 --- a/storage/raw/index/leveldb/leveldb.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -22,7 +22,7 @@ import ( "github.com/prometheus/prometheus/storage/raw/leveldb" ) -var existenceValue = &dto.MembershipIndexValue{} +var existenceValue = new(dto.MembershipIndexValue) type LevelDBMembershipIndex struct { persistence *leveldb.LevelDBPersistence @@ -44,18 +44,19 @@ func (l *LevelDBMembershipIndex) Put(k proto.Message) error { return l.persistence.Put(k, existenceValue) } -func NewLevelDBMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (i *LevelDBMembershipIndex, err error) { +type LevelDBIndexOptions struct { + leveldb.LevelDBOptions +} - leveldbPersistence, err := leveldb.NewLevelDBPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded) +func NewLevelDBMembershipIndex(o *LevelDBIndexOptions) (i *LevelDBMembershipIndex, err error) { + leveldbPersistence, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions) if err != nil { - return + return nil, err } - i = &LevelDBMembershipIndex{ + return &LevelDBMembershipIndex{ persistence: leveldbPersistence, - } - - return + }, nil } func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error { @@ -66,14 +67,14 @@ func (l *LevelDBMembershipIndex) Commit(batch raw.Batch) error { // // Beware that it would probably be imprudent to run this on a live user-facing // server due to latency implications. -func (l *LevelDBMembershipIndex) CompactKeyspace() { - l.persistence.CompactKeyspace() +func (l *LevelDBMembershipIndex) Prune() { + l.persistence.Prune() } func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) { return l.persistence.ApproximateSize() } -func (l *LevelDBMembershipIndex) State() leveldb.DatabaseState { +func (l *LevelDBMembershipIndex) State() *raw.DatabaseState { return l.persistence.State() } diff --git a/storage/raw/interface.go b/storage/raw/interface.go index c369724b8..25f226803 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -19,9 +19,23 @@ import ( "github.com/prometheus/prometheus/storage" ) +type ForEacher interface { + // ForEach is responsible for iterating through all records in the database + // until one of the following conditions are met: + // + // 1.) A system anomaly in the database scan. + // 2.) The last record in the database is reached. + // 3.) A FilterResult of STOP is emitted by the Filter. + // + // Decoding errors for an entity cause that entity to be skipped. + ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error) +} + // Persistence models a key-value store for bytes that supports various // additional operations. type Persistence interface { + ForEacher + // Close reaps all of the underlying system resources associated with this // persistence. Close() @@ -34,15 +48,6 @@ type Persistence interface { Drop(key proto.Message) error // Put sets the key to a given value. Put(key, value proto.Message) error - // ForEach is responsible for iterating through all records in the database - // until one of the following conditions are met: - // - // 1.) A system anomaly in the database scan. - // 2.) The last record in the database is reached. - // 3.) A FilterResult of STOP is emitted by the Filter. - // - // Decoding errors for an entity cause that entity to be skipped. - ForEach(storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator) (scannedEntireCorpus bool, err error) // Commit applies the Batch operations to the database. Commit(Batch) error } @@ -59,3 +64,7 @@ type Batch interface { // Drop follows the same protocol as Persistence.Drop. Drop(key proto.Message) } + +type Pruner interface { + Prune() (noop bool, err error) +} diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 919aeedf8..22f23f097 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -14,7 +14,6 @@ package leveldb import ( - "flag" "fmt" "time" @@ -26,16 +25,11 @@ import ( "github.com/prometheus/prometheus/storage/raw" ) -var ( - leveldbFlushOnMutate = flag.Bool("leveldbFlushOnMutate", false, "Whether LevelDB should flush every operation to disk upon mutation before returning (bool).") - leveldbUseSnappy = flag.Bool("leveldbUseSnappy", true, "Whether LevelDB attempts to use Snappy for compressing elements (bool).") - leveldbUseParanoidChecks = flag.Bool("leveldbUseParanoidChecks", false, "Whether LevelDB uses expensive checks (bool).") - maximumOpenFiles = flag.Int("leveldb.maximumOpenFiles", 128, "The maximum number of files each LevelDB may maintain.") -) - // LevelDBPersistence is a disk-backed sorted key-value store. type LevelDBPersistence struct { - path string + path string + name string + purpose string cache *levigo.Cache filterPolicy *levigo.FilterPolicy @@ -169,25 +163,47 @@ func (i levigoIterator) GetError() (err error) { return i.iterator.GetError() } -func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevelDBPersistence, error) { +type Compression uint + +const ( + Snappy Compression = iota + Uncompressed +) + +type LevelDBOptions struct { + Path string + Name string + Purpose string + + CacheSizeBytes int + OpenFileAllowance int + + FlushOnMutate bool + UseParanoidChecks bool + + Compression Compression +} + +func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) { options := levigo.NewOptions() options.SetCreateIfMissing(true) - options.SetParanoidChecks(*leveldbUseParanoidChecks) - compression := levigo.NoCompression - if *leveldbUseSnappy { - compression = levigo.SnappyCompression + options.SetParanoidChecks(o.UseParanoidChecks) + + compression := levigo.SnappyCompression + if o.Compression == Uncompressed { + compression = levigo.NoCompression } options.SetCompression(compression) - cache := levigo.NewLRUCache(cacheCapacity) + cache := levigo.NewLRUCache(o.CacheSizeBytes) options.SetCache(cache) - filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded) + filterPolicy := levigo.NewBloomFilter(10) options.SetFilterPolicy(filterPolicy) - options.SetMaxOpenFiles(*maximumOpenFiles) + options.SetMaxOpenFiles(o.OpenFileAllowance) - storage, err := levigo.Open(storageRoot, options) + storage, err := levigo.Open(o.Path, options) if err != nil { return nil, err } @@ -195,10 +211,12 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter readOptions := levigo.NewReadOptions() writeOptions := levigo.NewWriteOptions() - writeOptions.SetSync(*leveldbFlushOnMutate) + writeOptions.SetSync(o.FlushOnMutate) return &LevelDBPersistence{ - path: storageRoot, + path: o.Path, + name: o.Name, + purpose: o.Purpose, cache: cache, filterPolicy: filterPolicy, @@ -303,7 +321,7 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) { // // Beware that it would probably be imprudent to run this on a live user-facing // server due to latency implications. -func (l *LevelDBPersistence) CompactKeyspace() { +func (l *LevelDBPersistence) Prune() { // Magic values per https://code.google.com/p/leveldb/source/browse/include/leveldb/db.h#131. keyspace := levigo.Range{ diff --git a/storage/raw/leveldb/state.go b/storage/raw/leveldb/state.go index 7723aac72..3de6dc457 100644 --- a/storage/raw/leveldb/state.go +++ b/storage/raw/leveldb/state.go @@ -14,8 +14,8 @@ package leveldb import ( + "github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/utility" - "time" ) const ( @@ -23,32 +23,22 @@ const ( sstablesKey = "leveldb.sstables" ) -// DatabaseState models a bundle of metadata about a LevelDB database used in -// template format string interpolation. -type DatabaseState struct { - LastRefreshed time.Time - Type string - Name string - Path string - LowLevelStatus string - SSTablesStatus string - ApproximateSize utility.ByteSize - Error error -} - -func (l *LevelDBPersistence) State() DatabaseState { - databaseState := DatabaseState{ - LastRefreshed: time.Now(), - Path: l.path, - LowLevelStatus: l.storage.PropertyValue(statsKey), - SSTablesStatus: l.storage.PropertyValue(sstablesKey), +func (l *LevelDBPersistence) State() *raw.DatabaseState { + databaseState := &raw.DatabaseState{ + Location: l.path, + Name: l.name, + Purpose: l.purpose, + Supplemental: map[string]string{}, } if size, err := l.ApproximateSize(); err != nil { - databaseState.Error = err + databaseState.Supplemental["Errors"] = err.Error() } else { - databaseState.ApproximateSize = utility.ByteSize(size) + databaseState.Size = utility.ByteSize(size) } + databaseState.Supplemental["Low Level"] = l.storage.PropertyValue(statsKey) + databaseState.Supplemental["SSTable"] = l.storage.PropertyValue(sstablesKey) + return databaseState } diff --git a/storage/raw/leveldb/test/fixtures.go b/storage/raw/leveldb/test/fixtures.go index cef9e77cf..808df886b 100644 --- a/storage/raw/leveldb/test/fixtures.go +++ b/storage/raw/leveldb/test/fixtures.go @@ -20,10 +20,7 @@ import ( "github.com/prometheus/prometheus/utility/test" ) -const ( - cacheCapacity = 0 - bitsPerBloomFilterEncoded = 0 -) +const cacheCapacity = 0 type ( // Pair models a prospective (key, value) double that will be committed to @@ -64,7 +61,11 @@ type ( func (p preparer) Prepare(n string, f FixtureFactory) (t test.TemporaryDirectory) { t = test.NewTemporaryDirectory(n, p.tester) - persistence, err := leveldb.NewLevelDBPersistence(t.Path(), cacheCapacity, bitsPerBloomFilterEncoded) + o := &leveldb.LevelDBOptions{ + Path: t.Path(), + CacheSizeBytes: cacheCapacity, + } + persistence, err := leveldb.NewLevelDBPersistence(o) if err != nil { defer t.Close() p.tester.Fatal(err) diff --git a/storage/raw/state.go b/storage/raw/state.go new file mode 100644 index 000000000..b5cd8698c --- /dev/null +++ b/storage/raw/state.go @@ -0,0 +1,53 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raw + +import ( + "github.com/prometheus/prometheus/utility" +) + +type DatabaseState struct { + Name string + + Size utility.ByteSize + + Location string + Purpose string + + Supplemental map[string]string +} + +type DatabaseStates []*DatabaseState + +func (s DatabaseStates) Len() int { + return len(s) +} + +func (s DatabaseStates) Less(i, j int) bool { + l := s[i] + r := s[j] + + if l.Name > r.Name { + return false + } + if l.Name < r.Name { + return true + } + + return l.Size < r.Size +} + +func (s DatabaseStates) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/tools/pruner/main.go b/tools/pruner/main.go index 61acca5d1..adb8924e8 100644 --- a/tools/pruner/main.go +++ b/tools/pruner/main.go @@ -44,7 +44,7 @@ func main() { log.Printf("Starting compaction...") size, _ := persistences.ApproximateSizes() log.Printf("Original Size: %d", size) - persistences.CompactKeyspaces() + persistences.Prune() log.Printf("Finished in %s", time.Since(start)) size, _ = persistences.ApproximateSizes() log.Printf("New Size: %d", size) diff --git a/web/databases.go b/web/databases.go index 2a1c07ade..fb6d48e80 100644 --- a/web/databases.go +++ b/web/databases.go @@ -14,29 +14,47 @@ package web import ( - "github.com/prometheus/prometheus/storage/raw/leveldb" "net/http" "sync" + "time" + + "github.com/prometheus/prometheus/storage/raw" ) -type DatabasesHandler struct { - States []leveldb.DatabaseState +type DatabaseStatesProvider interface { + States() raw.DatabaseStates +} - Incoming chan []leveldb.DatabaseState +type DatabasesHandler struct { + RefreshInterval time.Duration + NextRefresh time.Time + + Current raw.DatabaseStates + + Provider DatabaseStatesProvider mutex sync.RWMutex } func (h *DatabasesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - select { - case states := <-h.Incoming: - h.mutex.Lock() - defer h.mutex.Unlock() - h.States = states - default: - h.mutex.RLock() - defer h.mutex.RUnlock() - } + h.Refresh() + h.mutex.RLock() + defer h.mutex.RUnlock() executeTemplate(w, "databases", h) } + +func (h *DatabasesHandler) Refresh() { + h.mutex.RLock() + if !time.Now().After(h.NextRefresh) { + h.mutex.RUnlock() + return + } + h.mutex.RUnlock() + + h.mutex.Lock() + defer h.mutex.Unlock() + + h.Current = h.Provider.States() + h.NextRefresh = time.Now().Add(h.RefreshInterval) +} diff --git a/web/templates/databases.html b/web/templates/databases.html index 045222964..dd6e6b7ec 100644 --- a/web/templates/databases.html +++ b/web/templates/databases.html @@ -3,34 +3,28 @@ {{define "content"}}
Path | -{{.Path}} | +Location | +{{$database.Location}} |
---|---|---|---|
Last Refreshed | -{{.LastRefreshed}} | +Purpose | +{{$database.Purpose}} |
Type | -{{.Type}} | +Size | +{{$database.Size}} |
Approximate Size | -{{.ApproximateSize}} | -||
Low Level Status | -{{.LowLevelStatus}} |
- ||
SSTable Status | -{{.SSTablesStatus}} |
+ {{$subject}} | +{{$state}} |