mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Simplify compaction and expose database sizes.
This commit simplifies the way that compactions across a database's keyspace occur due to reading the LevelDB internals. Secondarily it introduces the database size estimation mechanisms. Include database health and help interfaces. Add database statistics; remove status goroutines. This commit kills the use of Go routines to expose status throughout the web components of Prometheus. It also dumps raw LevelDB status on a separate /databases endpoint.
This commit is contained in:
parent
92ad65ff13
commit
b224251981
|
@ -1,3 +1,4 @@
|
|||
|
||||
# Prometheus
|
||||
|
||||
Bedecke deinen Himmel, Zeus! A new kid is in town.
|
||||
|
|
62
main.go
62
main.go
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/prometheus/prometheus/retrieval/format"
|
||||
"github.com/prometheus/prometheus/rules"
|
||||
"github.com/prometheus/prometheus/storage/metric"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"github.com/prometheus/prometheus/web"
|
||||
"github.com/prometheus/prometheus/web/api"
|
||||
"log"
|
||||
|
@ -66,8 +67,10 @@ type prometheus struct {
|
|||
bodyCompactionTimer *time.Ticker
|
||||
tailCompactionTimer *time.Ticker
|
||||
deletionTimer *time.Ticker
|
||||
reportDatabasesTimer *time.Ticker
|
||||
curationMutex sync.Mutex
|
||||
curationState chan metric.CurationState
|
||||
databaseStates chan []leveldb.DatabaseState
|
||||
stopBackgroundOperations chan bool
|
||||
|
||||
ruleResults chan *rules.Result
|
||||
|
@ -132,6 +135,10 @@ func (p *prometheus) close() {
|
|||
p.deletionTimer.Stop()
|
||||
}
|
||||
|
||||
if p.reportDatabasesTimer != nil {
|
||||
p.reportDatabasesTimer.Stop()
|
||||
}
|
||||
|
||||
if len(p.stopBackgroundOperations) == 0 {
|
||||
p.stopBackgroundOperations <- true
|
||||
}
|
||||
|
@ -141,6 +148,26 @@ func (p *prometheus) close() {
|
|||
p.storage.Close()
|
||||
close(p.stopBackgroundOperations)
|
||||
close(p.curationState)
|
||||
close(p.databaseStates)
|
||||
}
|
||||
|
||||
func (p *prometheus) reportDatabaseState() {
|
||||
for _ = range p.reportDatabasesTimer.C {
|
||||
// BUG(matt): Per Julius, ...
|
||||
// These channel magic tricks confuse me and seem a bit awkward just to
|
||||
// pass a status around. Now that we have Go 1.1, would it be maybe be
|
||||
// nicer to pass ts.DiskStorage.States as a method value
|
||||
// (http://tip.golang.org/ref/spec#Method_values) to the web layer
|
||||
// instead of doing this?
|
||||
select {
|
||||
case <-p.databaseStates:
|
||||
// Reset the future database state if nobody consumes it.
|
||||
case p.databaseStates <- p.storage.DiskStorage.States():
|
||||
// Set the database state so someone can consume it if they want.
|
||||
default:
|
||||
// Don't block.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -171,6 +198,7 @@ func main() {
|
|||
scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity)
|
||||
ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity)
|
||||
curationState := make(chan metric.CurationState, 1)
|
||||
databaseStates := make(chan []leveldb.DatabaseState, 1)
|
||||
// Coprime numbers, fool!
|
||||
headCompactionTimer := time.NewTicker(*headCompactInterval)
|
||||
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
|
||||
|
@ -181,17 +209,25 @@ func main() {
|
|||
targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance)
|
||||
targetManager.AddTargetsFromConfig(conf)
|
||||
|
||||
flags := map[string]string{}
|
||||
|
||||
flag.VisitAll(func(f *flag.Flag) {
|
||||
flags[f.Name] = f.Value.String()
|
||||
})
|
||||
|
||||
statusHandler := &web.StatusHandler{
|
||||
BuildInfo: BuildInfo,
|
||||
Config: &conf,
|
||||
PrometheusStatus: &web.PrometheusStatus{
|
||||
BuildInfo: BuildInfo,
|
||||
Config: conf.String(),
|
||||
TargetPools: targetManager.Pools(),
|
||||
Flags: flags,
|
||||
},
|
||||
CurationState: curationState,
|
||||
// Furnish the default status.
|
||||
PrometheusStatus: &web.PrometheusStatus{},
|
||||
TargetManager: targetManager,
|
||||
}
|
||||
|
||||
// The closing of curationState implicitly closes this routine.
|
||||
go statusHandler.ServeRequestsForever()
|
||||
databasesHandler := &web.DatabasesHandler{
|
||||
Incoming: databaseStates,
|
||||
}
|
||||
|
||||
metricsService := &api.MetricsService{
|
||||
Config: &conf,
|
||||
|
@ -200,8 +236,9 @@ func main() {
|
|||
}
|
||||
|
||||
webService := &web.WebService{
|
||||
StatusHandler: statusHandler,
|
||||
MetricsHandler: metricsService,
|
||||
StatusHandler: statusHandler,
|
||||
MetricsHandler: metricsService,
|
||||
DatabasesHandler: databasesHandler,
|
||||
}
|
||||
|
||||
prometheus := prometheus{
|
||||
|
@ -211,7 +248,11 @@ func main() {
|
|||
|
||||
deletionTimer: deletionTimer,
|
||||
|
||||
curationState: curationState,
|
||||
reportDatabasesTimer: time.NewTicker(15 * time.Minute),
|
||||
|
||||
curationState: curationState,
|
||||
databaseStates: databaseStates,
|
||||
|
||||
ruleResults: ruleResults,
|
||||
scrapeResults: scrapeResults,
|
||||
|
||||
|
@ -223,6 +264,7 @@ func main() {
|
|||
|
||||
go ts.Serve()
|
||||
go prometheus.interruptHandler()
|
||||
go prometheus.reportDatabaseState()
|
||||
|
||||
go func() {
|
||||
for _ = range prometheus.headCompactionTimer.C {
|
||||
|
|
|
@ -119,9 +119,8 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
|
|||
}(time.Now())
|
||||
defer func() {
|
||||
select {
|
||||
case status <- CurationState{
|
||||
Active: false,
|
||||
}:
|
||||
case status <- CurationState{Active: false}:
|
||||
case <-status:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
@ -268,6 +267,7 @@ func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult)
|
|||
Limit: w.ignoreYoungerThan,
|
||||
Fingerprint: fingerprint,
|
||||
}:
|
||||
case <-w.status:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
|
||||
leveldb "github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
"log"
|
||||
"sort"
|
||||
|
@ -931,3 +931,44 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
|
|||
|
||||
return total, nil
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) States() []leveldb.DatabaseState {
|
||||
states := []leveldb.DatabaseState{}
|
||||
|
||||
state := l.CurationRemarks.State()
|
||||
state.Name = "Curation Remarks"
|
||||
state.Type = "Watermark"
|
||||
states = append(states, state)
|
||||
|
||||
state = l.fingerprintToMetrics.State()
|
||||
state.Name = "Fingerprints to Metrics"
|
||||
state.Type = "Index"
|
||||
states = append(states, state)
|
||||
|
||||
state = l.labelNameToFingerprints.State()
|
||||
state.Name = "Label Name to Fingerprints"
|
||||
state.Type = "Inverted Index"
|
||||
states = append(states, state)
|
||||
|
||||
state = l.labelSetToFingerprints.State()
|
||||
state.Name = "Label Pair to Fingerprints"
|
||||
state.Type = "Inverted Index"
|
||||
states = append(states, state)
|
||||
|
||||
state = l.MetricHighWatermarks.State()
|
||||
state.Name = "Metric Last Write"
|
||||
state.Type = "Watermark"
|
||||
states = append(states, state)
|
||||
|
||||
state = l.metricMembershipIndex.State()
|
||||
state.Name = "Metric Membership"
|
||||
state.Type = "Index"
|
||||
states = append(states, state)
|
||||
|
||||
state = l.MetricSamples.State()
|
||||
state.Name = "Samples"
|
||||
state.Type = "Time Series"
|
||||
states = append(states, state)
|
||||
|
||||
return states
|
||||
}
|
||||
|
|
|
@ -59,20 +59,27 @@ type TieredStorage struct {
|
|||
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
|
||||
DiskStorage *LevelDBMetricPersistence
|
||||
|
||||
appendToDiskQueue chan model.Samples
|
||||
diskFrontier *diskFrontier
|
||||
draining chan chan bool
|
||||
flushMemoryInterval time.Duration
|
||||
appendToDiskQueue chan model.Samples
|
||||
|
||||
diskFrontier *diskFrontier
|
||||
|
||||
memoryArena memorySeriesStorage
|
||||
memoryTTL time.Duration
|
||||
flushMemoryInterval time.Duration
|
||||
writeMemoryInterval time.Duration
|
||||
|
||||
// This mutex manages any concurrent reads/writes of the memoryArena.
|
||||
memoryMutex sync.RWMutex
|
||||
// This mutex blocks only deletions from the memoryArena. It is held for a
|
||||
// potentially long time for an entire renderView() duration, since we depend
|
||||
// on no samples being removed from memory after grabbing a LevelDB snapshot.
|
||||
memoryDeleteMutex sync.RWMutex
|
||||
viewQueue chan viewJob
|
||||
writeMemoryInterval time.Duration
|
||||
memoryDeleteMutex sync.RWMutex
|
||||
|
||||
viewQueue chan viewJob
|
||||
|
||||
draining chan chan bool
|
||||
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// viewJob encapsulates a request to extract sample values from the datastore.
|
||||
|
@ -180,11 +187,13 @@ func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
|
|||
func (t *TieredStorage) Serve() {
|
||||
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
|
||||
defer flushMemoryTicker.Stop()
|
||||
reportTicker := time.NewTicker(time.Second)
|
||||
defer reportTicker.Stop()
|
||||
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
|
||||
defer writeMemoryTicker.Stop()
|
||||
queueReportTicker := time.NewTicker(time.Second)
|
||||
defer queueReportTicker.Stop()
|
||||
|
||||
go func() {
|
||||
for _ = range reportTicker.C {
|
||||
for _ = range queueReportTicker.C {
|
||||
t.reportQueues()
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -73,3 +73,7 @@ func (l *LevelDBMembershipIndex) CompactKeyspace() {
|
|||
func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) {
|
||||
return l.persistence.ApproximateSize()
|
||||
}
|
||||
|
||||
func (l *LevelDBMembershipIndex) State() leveldb.DatabaseState {
|
||||
return l.persistence.State()
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ var (
|
|||
|
||||
// LevelDBPersistence is a disk-backed sorted key-value store.
|
||||
type LevelDBPersistence struct {
|
||||
path string
|
||||
|
||||
cache *levigo.Cache
|
||||
filterPolicy *levigo.FilterPolicy
|
||||
options *levigo.Options
|
||||
|
@ -67,11 +69,9 @@ type levigoIterator struct {
|
|||
}
|
||||
|
||||
func (i levigoIterator) String() string {
|
||||
var (
|
||||
valid = "valid"
|
||||
open = "open"
|
||||
snapshotted = "snapshotted"
|
||||
)
|
||||
valid := "valid"
|
||||
open := "open"
|
||||
snapshotted := "snapshotted"
|
||||
|
||||
if i.closed {
|
||||
open = "closed"
|
||||
|
@ -193,6 +193,8 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
|
|||
|
||||
writeOptions.SetSync(*leveldbFlushOnMutate)
|
||||
p = &LevelDBPersistence{
|
||||
path: storageRoot,
|
||||
|
||||
cache: cache,
|
||||
filterPolicy: filterPolicy,
|
||||
|
||||
|
|
54
storage/raw/leveldb/state.go
Normal file
54
storage/raw/leveldb/state.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package leveldb
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
statsKey = "leveldb.stats"
|
||||
sstablesKey = "leveldb.sstables"
|
||||
)
|
||||
|
||||
// DatabaseState models a bundle of metadata about a LevelDB database used in
|
||||
// template format string interpolation.
|
||||
type DatabaseState struct {
|
||||
LastRefreshed time.Time
|
||||
Type string
|
||||
Name string
|
||||
Path string
|
||||
LowLevelStatus string
|
||||
SSTablesStatus string
|
||||
ApproximateSize utility.ByteSize
|
||||
Error error
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) State() DatabaseState {
|
||||
databaseState := DatabaseState{
|
||||
LastRefreshed: time.Now(),
|
||||
Path: l.path,
|
||||
LowLevelStatus: l.storage.PropertyValue(statsKey),
|
||||
SSTablesStatus: l.storage.PropertyValue(sstablesKey),
|
||||
}
|
||||
|
||||
if size, err := l.ApproximateSize(); err != nil {
|
||||
databaseState.Error = err
|
||||
} else {
|
||||
databaseState.ApproximateSize = utility.ByteSize(size)
|
||||
}
|
||||
|
||||
return databaseState
|
||||
}
|
55
utility/bytesize.go
Normal file
55
utility/bytesize.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package utility
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// The canonical example: http://golang.org/doc/progs/eff_bytesize.go.
|
||||
type ByteSize float64
|
||||
|
||||
const (
|
||||
_ = iota // ignore first value by assigning to blank identifier
|
||||
KB ByteSize = 1 << (10 * iota)
|
||||
MB
|
||||
GB
|
||||
TB
|
||||
PB
|
||||
EB
|
||||
ZB
|
||||
YB
|
||||
)
|
||||
|
||||
func (b ByteSize) String() string {
|
||||
switch {
|
||||
case b >= YB:
|
||||
return fmt.Sprintf("%.2fYB", b/YB)
|
||||
case b >= ZB:
|
||||
return fmt.Sprintf("%.2fZB", b/ZB)
|
||||
case b >= EB:
|
||||
return fmt.Sprintf("%.2fEB", b/EB)
|
||||
case b >= PB:
|
||||
return fmt.Sprintf("%.2fPB", b/PB)
|
||||
case b >= TB:
|
||||
return fmt.Sprintf("%.2fTB", b/TB)
|
||||
case b >= GB:
|
||||
return fmt.Sprintf("%.2fGB", b/GB)
|
||||
case b >= MB:
|
||||
return fmt.Sprintf("%.2fMB", b/MB)
|
||||
case b >= KB:
|
||||
return fmt.Sprintf("%.2fKB", b/KB)
|
||||
}
|
||||
return fmt.Sprintf("%.2fB", b)
|
||||
}
|
42
web/databases.go
Normal file
42
web/databases.go
Normal file
|
@ -0,0 +1,42 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package web
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"net/http"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type DatabasesHandler struct {
|
||||
States []leveldb.DatabaseState
|
||||
|
||||
Incoming chan []leveldb.DatabaseState
|
||||
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
func (h *DatabasesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
select {
|
||||
case states := <-h.Incoming:
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
h.States = states
|
||||
default:
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
}
|
||||
|
||||
executeTemplate(w, "databases", h)
|
||||
}
|
|
@ -79,3 +79,7 @@ input[name=end_input], input[name=range_input] {
|
|||
#add_graph {
|
||||
margin-left: 8px;
|
||||
}
|
||||
|
||||
.literal_output td {
|
||||
font-family: monospace;
|
||||
}
|
|
@ -14,8 +14,6 @@
|
|||
package web
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
"github.com/prometheus/prometheus/storage/metric"
|
||||
"net/http"
|
||||
|
@ -32,39 +30,22 @@ type PrometheusStatus struct {
|
|||
}
|
||||
|
||||
type StatusHandler struct {
|
||||
sync.Mutex
|
||||
BuildInfo map[string]string
|
||||
Config *config.Config
|
||||
CurationState chan metric.CurationState
|
||||
PrometheusStatus *PrometheusStatus
|
||||
TargetManager retrieval.TargetManager
|
||||
}
|
||||
|
||||
func (h *StatusHandler) ServeRequestsForever() {
|
||||
flags := map[string]string{}
|
||||
|
||||
flag.VisitAll(func(f *flag.Flag) {
|
||||
flags[f.Name] = f.Value.String()
|
||||
})
|
||||
|
||||
h.PrometheusStatus = &PrometheusStatus{
|
||||
BuildInfo: h.BuildInfo,
|
||||
Config: h.Config.String(),
|
||||
Flags: flags,
|
||||
Rules: "TODO: list rules here",
|
||||
// BUG: race condition, concurrent map access
|
||||
TargetPools: h.TargetManager.Pools(),
|
||||
}
|
||||
|
||||
for state := range h.CurationState {
|
||||
h.Lock()
|
||||
h.PrometheusStatus.Curation = state
|
||||
h.Unlock()
|
||||
}
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
select {
|
||||
case curationState := <-h.CurationState:
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
h.PrometheusStatus.Curation = curationState
|
||||
default:
|
||||
h.mutex.RLock()
|
||||
defer h.mutex.RUnlock()
|
||||
}
|
||||
|
||||
executeTemplate(w, "status", h.PrometheusStatus)
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
<html lang="en">
|
||||
<head>
|
||||
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
|
||||
<title>Prometheus</title>
|
||||
<title>Prometheus Time Series Collection and Processing Server</title>
|
||||
<script src="/static/vendor/js/jquery.min.js"></script>
|
||||
<link type="text/css" rel="stylesheet" href="/static/css/prometheus.css">
|
||||
{{template "head" .}}
|
||||
|
@ -10,8 +10,12 @@
|
|||
|
||||
<body>
|
||||
<div id="navigation">
|
||||
<a href="/graph">Graph</a>
|
||||
<a href="/graph">Graph & Console</a>
|
||||
<a href="/">Status</a>
|
||||
<a href="/databases">Databases</a>
|
||||
|
||||
<!-- Help should preferentially be the right-most element. -->
|
||||
<a href="https://github.com/prometheus/prometheus/wiki" target="_blank">Help</a>
|
||||
</div>
|
||||
{{template "content" .}}
|
||||
</body>
|
||||
|
|
|
@ -34,8 +34,9 @@ var (
|
|||
)
|
||||
|
||||
type WebService struct {
|
||||
StatusHandler *StatusHandler
|
||||
MetricsHandler *api.MetricsService
|
||||
StatusHandler *StatusHandler
|
||||
DatabasesHandler *DatabasesHandler
|
||||
MetricsHandler *api.MetricsService
|
||||
}
|
||||
|
||||
func (w WebService) ServeForever() error {
|
||||
|
@ -53,6 +54,7 @@ func (w WebService) ServeForever() error {
|
|||
exp.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
|
||||
|
||||
exp.Handle("/", w.StatusHandler)
|
||||
exp.Handle("/databases", w.DatabasesHandler)
|
||||
exp.HandleFunc("/graph", graphHandler)
|
||||
|
||||
exp.Handle("/api/", gorest.Handle())
|
||||
|
|
Loading…
Reference in a new issue