mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
web: start web handler while TSDB is starting up
This commit is contained in:
parent
d6fbfb49eb
commit
7b02bfee0a
|
@ -215,35 +215,17 @@ func main() {
|
||||||
level.Info(logger).Log("build_context", version.BuildContext())
|
level.Info(logger).Log("build_context", version.BuildContext())
|
||||||
level.Info(logger).Log("host_details", Uname())
|
level.Info(logger).Log("host_details", Uname())
|
||||||
|
|
||||||
var (
|
|
||||||
// sampleAppender = storage.Fanout{}
|
|
||||||
reloadables []Reloadable
|
|
||||||
)
|
|
||||||
|
|
||||||
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
|
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
|
||||||
// long and synchronous tsdb init.
|
// long and synchronous tsdb init.
|
||||||
hup := make(chan os.Signal)
|
hup := make(chan os.Signal)
|
||||||
hupReady := make(chan bool)
|
hupReady := make(chan bool)
|
||||||
signal.Notify(hup, syscall.SIGHUP)
|
signal.Notify(hup, syscall.SIGHUP)
|
||||||
|
|
||||||
level.Info(logger).Log("msg", "Starting TSDB")
|
var (
|
||||||
|
localStorage = &tsdb.ReadyStorage{}
|
||||||
localStorage, err := tsdb.Open(
|
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"))
|
||||||
cfg.localStoragePath,
|
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||||
log.With(logger, "component", "tsdb"),
|
|
||||||
prometheus.DefaultRegisterer,
|
|
||||||
&cfg.tsdb,
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
level.Error(logger).Log("msg", "Opening TSDB failed", "err", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
level.Info(logger).Log("msg", "TSDB succesfully started")
|
|
||||||
|
|
||||||
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"))
|
|
||||||
reloadables = append(reloadables, remoteStorage)
|
|
||||||
fanoutStorage := storage.NewFanout(logger, tsdb.Adapter(localStorage), remoteStorage)
|
|
||||||
|
|
||||||
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
|
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
|
||||||
var (
|
var (
|
||||||
|
@ -263,7 +245,8 @@ func main() {
|
||||||
})
|
})
|
||||||
|
|
||||||
cfg.web.Context = ctx
|
cfg.web.Context = ctx
|
||||||
cfg.web.Storage = localStorage
|
cfg.web.TSDB = localStorage.Get
|
||||||
|
cfg.web.Storage = fanoutStorage
|
||||||
cfg.web.QueryEngine = queryEngine
|
cfg.web.QueryEngine = queryEngine
|
||||||
cfg.web.TargetManager = targetManager
|
cfg.web.TargetManager = targetManager
|
||||||
cfg.web.RuleManager = ruleManager
|
cfg.web.RuleManager = ruleManager
|
||||||
|
@ -285,11 +268,12 @@ func main() {
|
||||||
|
|
||||||
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
|
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
|
||||||
|
|
||||||
reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)
|
reloadables := []Reloadable{
|
||||||
|
remoteStorage,
|
||||||
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
|
targetManager,
|
||||||
level.Error(logger).Log("msg", "Error loading config", "err", err)
|
ruleManager,
|
||||||
os.Exit(1)
|
webHandler,
|
||||||
|
notifier,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for reload or termination signals. Start the handler for SIGHUP as
|
// Wait for reload or termination signals. Start the handler for SIGHUP as
|
||||||
|
@ -314,14 +298,29 @@ func main() {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Start all components. The order is NOT arbitrary.
|
// Start all components while we wait for TSDB to open but only load
|
||||||
defer func() {
|
// initial config and mark ourselves as ready after it completed.
|
||||||
if err := fanoutStorage.Close(); err != nil {
|
dbOpen := make(chan struct{})
|
||||||
level.Error(logger).Log("msg", "Closing storage(s) failed", "err", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// defer remoteStorage.Stop()
|
go func() {
|
||||||
|
defer close(dbOpen)
|
||||||
|
|
||||||
|
level.Info(logger).Log("msg", "Starting TSDB")
|
||||||
|
|
||||||
|
db, err := tsdb.Open(
|
||||||
|
cfg.localStoragePath,
|
||||||
|
log.With(logger, "component", "tsdb"),
|
||||||
|
prometheus.DefaultRegisterer,
|
||||||
|
&cfg.tsdb,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Opening storage failed", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
level.Info(logger).Log("msg", "TSDB started")
|
||||||
|
|
||||||
|
localStorage.Set(db)
|
||||||
|
}()
|
||||||
|
|
||||||
prometheus.MustRegister(configSuccess)
|
prometheus.MustRegister(configSuccess)
|
||||||
prometheus.MustRegister(configSuccessTime)
|
prometheus.MustRegister(configSuccessTime)
|
||||||
|
@ -344,6 +343,19 @@ func main() {
|
||||||
errc := make(chan error)
|
errc := make(chan error)
|
||||||
go func() { errc <- webHandler.Run(ctx) }()
|
go func() { errc <- webHandler.Run(ctx) }()
|
||||||
|
|
||||||
|
<-dbOpen
|
||||||
|
|
||||||
|
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Error loading config", "err", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err := fanoutStorage.Close(); err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Wait for reload or termination signals.
|
// Wait for reload or termination signals.
|
||||||
close(hupReady) // Unblock SIGHUP handler.
|
close(hupReady) // Unblock SIGHUP handler.
|
||||||
|
|
||||||
|
|
|
@ -914,6 +914,7 @@ loop:
|
||||||
added++
|
added++
|
||||||
continue
|
continue
|
||||||
default:
|
default:
|
||||||
|
level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
if tp == nil {
|
if tp == nil {
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
@ -27,6 +28,63 @@ import (
|
||||||
tsdbLabels "github.com/prometheus/tsdb/labels"
|
tsdbLabels "github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrNotReady is returned if the underlying storage is not ready yet.
|
||||||
|
var ErrNotReady = errors.New("TSDB not ready")
|
||||||
|
|
||||||
|
// ReadyStorage implements the Storage interface while allowing to set the actual
|
||||||
|
// storage at a later point in time.
|
||||||
|
type ReadyStorage struct {
|
||||||
|
mtx sync.RWMutex
|
||||||
|
a *adapter
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the storage.
|
||||||
|
func (s *ReadyStorage) Set(db *tsdb.DB) {
|
||||||
|
s.mtx.Lock()
|
||||||
|
defer s.mtx.Unlock()
|
||||||
|
|
||||||
|
s.a = &adapter{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the storage.
|
||||||
|
func (s *ReadyStorage) Get() *tsdb.DB {
|
||||||
|
if x := s.get(); x != nil {
|
||||||
|
return x.db
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ReadyStorage) get() *adapter {
|
||||||
|
s.mtx.RLock()
|
||||||
|
x := s.a
|
||||||
|
s.mtx.RUnlock()
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
|
||||||
|
// Querier implements the Storage interface.
|
||||||
|
func (s *ReadyStorage) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||||
|
if x := s.get(); x != nil {
|
||||||
|
return x.Querier(mint, maxt)
|
||||||
|
}
|
||||||
|
return nil, ErrNotReady
|
||||||
|
}
|
||||||
|
|
||||||
|
// Appender implements the Storage interface.
|
||||||
|
func (s *ReadyStorage) Appender() (storage.Appender, error) {
|
||||||
|
if x := s.get(); x != nil {
|
||||||
|
return x.Appender()
|
||||||
|
}
|
||||||
|
return nil, ErrNotReady
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements the Storage interface.
|
||||||
|
func (s *ReadyStorage) Close() error {
|
||||||
|
if x := s.Get(); x != nil {
|
||||||
|
return x.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func Adapter(db *tsdb.DB) storage.Storage {
|
func Adapter(db *tsdb.DB) storage.Storage {
|
||||||
return &adapter{db: db}
|
return &adapter{db: db}
|
||||||
}
|
}
|
||||||
|
|
5
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
5
vendor/github.com/prometheus/tsdb/block.go
generated
vendored
|
@ -64,11 +64,6 @@ type Appendable interface {
|
||||||
Appender() Appender
|
Appender() Appender
|
||||||
}
|
}
|
||||||
|
|
||||||
// Queryable defines an entity which provides a Querier.
|
|
||||||
type Queryable interface {
|
|
||||||
Querier(mint, maxt int64) Querier
|
|
||||||
}
|
|
||||||
|
|
||||||
// BlockMeta provides meta information about a block.
|
// BlockMeta provides meta information about a block.
|
||||||
type BlockMeta struct {
|
type BlockMeta struct {
|
||||||
// Unique identifier for the block and its contents. Changes on compaction.
|
// Unique identifier for the block and its contents. Changes on compaction.
|
||||||
|
|
3
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
3
vendor/github.com/prometheus/tsdb/db.go
generated
vendored
|
@ -165,8 +165,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if l == nil {
|
if l == nil {
|
||||||
l = log.NewLogfmtLogger(os.Stdout)
|
l = log.NewNopLogger()
|
||||||
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
|
||||||
}
|
}
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = DefaultOptions
|
opts = DefaultOptions
|
||||||
|
|
46
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
46
vendor/github.com/prometheus/tsdb/head.go
generated
vendored
|
@ -185,13 +185,18 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadWAL initializes the head by consuming the write ahead log.
|
||||||
func (h *Head) ReadWAL() error {
|
func (h *Head) ReadWAL() error {
|
||||||
r := h.wal.Reader()
|
r := h.wal.Reader()
|
||||||
mint := h.MinTime()
|
mint := h.MinTime()
|
||||||
|
|
||||||
seriesFunc := func(series []RefSeries) error {
|
seriesFunc := func(series []RefSeries) error {
|
||||||
for _, s := range series {
|
for _, s := range series {
|
||||||
h.create(s.Labels.Hash(), s.Labels)
|
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
||||||
|
|
||||||
|
if h.lastSeriesID < s.Ref {
|
||||||
|
h.lastSeriesID = s.Ref
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -202,7 +207,8 @@ func (h *Head) ReadWAL() error {
|
||||||
}
|
}
|
||||||
ms := h.series.getByID(s.Ref)
|
ms := h.series.getByID(s.Ref)
|
||||||
if ms == nil {
|
if ms == nil {
|
||||||
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
|
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
_, chunkCreated := ms.append(s.T, s.V)
|
_, chunkCreated := ms.append(s.T, s.V)
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
|
@ -210,7 +216,6 @@ func (h *Head) ReadWAL() error {
|
||||||
h.metrics.chunks.Inc()
|
h.metrics.chunks.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
deletesFunc := func(stones []Stone) error {
|
deletesFunc := func(stones []Stone) error {
|
||||||
|
@ -222,7 +227,6 @@ func (h *Head) ReadWAL() error {
|
||||||
h.tombstones.add(s.ref, itv)
|
h.tombstones.add(s.ref, itv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,17 +383,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
||||||
if t < a.mint {
|
if t < a.mint {
|
||||||
return 0, ErrOutOfBounds
|
return 0, ErrOutOfBounds
|
||||||
}
|
}
|
||||||
hash := lset.Hash()
|
|
||||||
|
|
||||||
s := a.head.series.getByHash(hash, lset)
|
|
||||||
|
|
||||||
if s == nil {
|
|
||||||
s = a.head.create(hash, lset)
|
|
||||||
|
|
||||||
|
s, created := a.head.getOrCreate(lset.Hash(), lset)
|
||||||
|
if created {
|
||||||
a.series = append(a.series, RefSeries{
|
a.series = append(a.series, RefSeries{
|
||||||
Ref: s.ref,
|
Ref: s.ref,
|
||||||
Labels: lset,
|
Labels: lset,
|
||||||
hash: hash,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return s.ref, a.AddFast(s.ref, t, v)
|
return s.ref, a.AddFast(s.ref, t, v)
|
||||||
|
@ -839,20 +838,32 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||||||
h.metrics.series.Inc()
|
// Just using `getOrSet` below would be semantically sufficient, but we'd create
|
||||||
h.metrics.seriesCreated.Inc()
|
// a new series on every sample inserted via Add(), which causes allocations
|
||||||
|
// and makes our series IDs rather random and harder to compress in postings.
|
||||||
|
s := h.series.getByHash(hash, lset)
|
||||||
|
if s != nil {
|
||||||
|
return s, false
|
||||||
|
}
|
||||||
|
|
||||||
// Optimistically assume that we are the first one to create the series.
|
// Optimistically assume that we are the first one to create the series.
|
||||||
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
||||||
|
|
||||||
|
return h.getOrCreateWithID(id, hash, lset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
|
||||||
s := newMemSeries(lset, id, h.chunkRange)
|
s := newMemSeries(lset, id, h.chunkRange)
|
||||||
|
|
||||||
s, created := h.series.getOrSet(hash, s)
|
s, created := h.series.getOrSet(hash, s)
|
||||||
// Skip indexing if we didn't actually create the series.
|
|
||||||
if !created {
|
if !created {
|
||||||
return s
|
return s, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.metrics.series.Inc()
|
||||||
|
h.metrics.seriesCreated.Inc()
|
||||||
|
|
||||||
h.postings.add(id, lset)
|
h.postings.add(id, lset)
|
||||||
|
|
||||||
h.symMtx.Lock()
|
h.symMtx.Lock()
|
||||||
|
@ -870,7 +881,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
||||||
h.symbols[l.Value] = struct{}{}
|
h.symbols[l.Value] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s
|
return s, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
||||||
|
@ -1023,6 +1034,7 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo
|
||||||
s.locks[i].Lock()
|
s.locks[i].Lock()
|
||||||
|
|
||||||
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
|
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
|
||||||
|
s.locks[i].Unlock()
|
||||||
return prev, false
|
return prev, false
|
||||||
}
|
}
|
||||||
s.hashes[i].set(hash, series)
|
s.hashes[i].set(hash, series)
|
||||||
|
|
3
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
3
vendor/github.com/prometheus/tsdb/index.go
generated
vendored
|
@ -570,6 +570,9 @@ var (
|
||||||
errInvalidFlag = fmt.Errorf("invalid flag")
|
errInvalidFlag = fmt.Errorf("invalid flag")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewIndexReader returns a new IndexReader on the given directory.
|
||||||
|
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) }
|
||||||
|
|
||||||
// newIndexReader returns a new indexReader on the given directory.
|
// newIndexReader returns a new indexReader on the given directory.
|
||||||
func newIndexReader(dir string) (*indexReader, error) {
|
func newIndexReader(dir string) (*indexReader, error) {
|
||||||
f, err := openMmapFile(filepath.Join(dir, "index"))
|
f, err := openMmapFile(filepath.Join(dir, "index"))
|
||||||
|
|
15
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
15
vendor/github.com/prometheus/tsdb/wal.go
generated
vendored
|
@ -99,9 +99,6 @@ type WALReader interface {
|
||||||
type RefSeries struct {
|
type RefSeries struct {
|
||||||
Ref uint64
|
Ref uint64
|
||||||
Labels labels.Labels
|
Labels labels.Labels
|
||||||
|
|
||||||
// hash for the label set. This field is not generally populated.
|
|
||||||
hash uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefSample is a timestamp/value pair associated with a reference to a series.
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
||||||
|
@ -827,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode series entry")
|
return errors.Wrap(err, "decode series entry")
|
||||||
}
|
}
|
||||||
seriesf(series)
|
if err := seriesf(series); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
||||||
|
@ -842,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode samples entry")
|
return errors.Wrap(err, "decode samples entry")
|
||||||
}
|
}
|
||||||
samplesf(samples)
|
if err := samplesf(samples); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
@ -858,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "decode delete entry")
|
return errors.Wrap(err, "decode delete entry")
|
||||||
}
|
}
|
||||||
deletesf(stones)
|
if err := deletesf(stones); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
// Update the times for the WAL segment file.
|
// Update the times for the WAL segment file.
|
||||||
|
|
||||||
cf := r.current()
|
cf := r.current()
|
||||||
|
|
14
vendor/vendor.json
vendored
14
vendor/vendor.json
vendored
|
@ -871,22 +871,22 @@
|
||||||
"revisionTime": "2016-04-11T19:08:41Z"
|
"revisionTime": "2016-04-11T19:08:41Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "mDKxPAubVLTWW/Gar13m7YDHSek=",
|
"checksumSHA1": "B5ndMoK8lqgFJ8xUZ/0V4zCpUw0=",
|
||||||
"path": "github.com/prometheus/tsdb",
|
"path": "github.com/prometheus/tsdb",
|
||||||
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
|
"revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a",
|
||||||
"revisionTime": "2017-09-11T08:41:33Z"
|
"revisionTime": "2017-09-19T08:20:19Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
|
||||||
"path": "github.com/prometheus/tsdb/chunks",
|
"path": "github.com/prometheus/tsdb/chunks",
|
||||||
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
|
"revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a",
|
||||||
"revisionTime": "2017-09-11T08:41:33Z"
|
"revisionTime": "2017-09-19T08:20:19Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
|
||||||
"path": "github.com/prometheus/tsdb/labels",
|
"path": "github.com/prometheus/tsdb/labels",
|
||||||
"revision": "3870ec285c4640d462a0ad80e7acbcdf1e939563",
|
"revision": "162a48e4f2c6e486a0ebf61cf9cea73a8023ef0a",
|
||||||
"revisionTime": "2017-09-11T08:41:33Z"
|
"revisionTime": "2017-09-19T08:20:19Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",
|
||||||
|
|
|
@ -46,8 +46,8 @@ import (
|
||||||
type API struct {
|
type API struct {
|
||||||
enableAdmin bool
|
enableAdmin bool
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
db *tsdb.DB
|
db func() *tsdb.DB
|
||||||
q func(mint, maxt int64) storage.Querier
|
q func(mint, maxt int64) (storage.Querier, error)
|
||||||
targets func() []*retrieval.Target
|
targets func() []*retrieval.Target
|
||||||
alertmanagers func() []*url.URL
|
alertmanagers func() []*url.URL
|
||||||
}
|
}
|
||||||
|
@ -55,9 +55,9 @@ type API struct {
|
||||||
// New returns a new API object.
|
// New returns a new API object.
|
||||||
func New(
|
func New(
|
||||||
now func() time.Time,
|
now func() time.Time,
|
||||||
db *tsdb.DB,
|
db func() *tsdb.DB,
|
||||||
qe *promql.Engine,
|
qe *promql.Engine,
|
||||||
q func(mint, maxt int64) storage.Querier,
|
q func(mint, maxt int64) (storage.Querier, error),
|
||||||
targets func() []*retrieval.Target,
|
targets func() []*retrieval.Target,
|
||||||
alertmanagers func() []*url.URL,
|
alertmanagers func() []*url.URL,
|
||||||
enableAdmin bool,
|
enableAdmin bool,
|
||||||
|
@ -149,28 +149,31 @@ func (s *adminDisabled) DeleteSeries(_ context.Context, r *pb.SeriesDeleteReques
|
||||||
|
|
||||||
// Admin provides an administration interface to Prometheus.
|
// Admin provides an administration interface to Prometheus.
|
||||||
type Admin struct {
|
type Admin struct {
|
||||||
db *tsdb.DB
|
db func() *tsdb.DB
|
||||||
snapdir string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAdmin returns a Admin server.
|
// NewAdmin returns a Admin server.
|
||||||
func NewAdmin(db *tsdb.DB) *Admin {
|
func NewAdmin(db func() *tsdb.DB) *Admin {
|
||||||
return &Admin{
|
return &Admin{
|
||||||
db: db,
|
db: db,
|
||||||
snapdir: filepath.Join(db.Dir(), "snapshots"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TSDBSnapshot implements pb.AdminServer.
|
// TSDBSnapshot implements pb.AdminServer.
|
||||||
func (s *Admin) TSDBSnapshot(_ context.Context, _ *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) {
|
func (s *Admin) TSDBSnapshot(_ context.Context, _ *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) {
|
||||||
|
db := s.db()
|
||||||
|
if db == nil {
|
||||||
|
return nil, status.Errorf(codes.Unavailable, "TSDB not ready")
|
||||||
|
}
|
||||||
var (
|
var (
|
||||||
name = fmt.Sprintf("%s-%x", time.Now().UTC().Format(time.RFC3339), rand.Int())
|
snapdir = filepath.Join(db.Dir(), "snapshots")
|
||||||
dir = filepath.Join(s.snapdir, name)
|
name = fmt.Sprintf("%s-%x", time.Now().UTC().Format(time.RFC3339), rand.Int())
|
||||||
|
dir = filepath.Join(snapdir, name)
|
||||||
)
|
)
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err)
|
return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err)
|
||||||
}
|
}
|
||||||
if err := s.db.Snapshot(dir); err != nil {
|
if err := db.Snapshot(dir); err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "create snapshot: %s", err)
|
return nil, status.Errorf(codes.Internal, "create snapshot: %s", err)
|
||||||
}
|
}
|
||||||
return &pb.TSDBSnapshotResponse{Name: name}, nil
|
return &pb.TSDBSnapshotResponse{Name: name}, nil
|
||||||
|
@ -210,7 +213,11 @@ func (s *Admin) DeleteSeries(_ context.Context, r *pb.SeriesDeleteRequest) (*pb.
|
||||||
|
|
||||||
matchers = append(matchers, lm)
|
matchers = append(matchers, lm)
|
||||||
}
|
}
|
||||||
if err := s.db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
|
db := s.db()
|
||||||
|
if db == nil {
|
||||||
|
return nil, status.Errorf(codes.Unavailable, "TSDB not ready")
|
||||||
|
}
|
||||||
|
if err := db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
return &pb.SeriesDeleteResponse{}, nil
|
return &pb.SeriesDeleteResponse{}, nil
|
||||||
|
|
38
web/web.go
38
web/web.go
|
@ -47,7 +47,6 @@ import (
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/common/route"
|
"github.com/prometheus/common/route"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
ptsdb "github.com/prometheus/prometheus/storage/tsdb"
|
|
||||||
"github.com/prometheus/tsdb"
|
"github.com/prometheus/tsdb"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"golang.org/x/net/netutil"
|
"golang.org/x/net/netutil"
|
||||||
|
@ -75,7 +74,7 @@ type Handler struct {
|
||||||
ruleManager *rules.Manager
|
ruleManager *rules.Manager
|
||||||
queryEngine *promql.Engine
|
queryEngine *promql.Engine
|
||||||
context context.Context
|
context context.Context
|
||||||
tsdb *tsdb.DB
|
tsdb func() *tsdb.DB
|
||||||
storage storage.Storage
|
storage storage.Storage
|
||||||
notifier *notifier.Notifier
|
notifier *notifier.Notifier
|
||||||
|
|
||||||
|
@ -122,7 +121,8 @@ type PrometheusVersion struct {
|
||||||
// Options for the web Handler.
|
// Options for the web Handler.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Context context.Context
|
Context context.Context
|
||||||
Storage *tsdb.DB
|
TSDB func() *tsdb.DB
|
||||||
|
Storage storage.Storage
|
||||||
QueryEngine *promql.Engine
|
QueryEngine *promql.Engine
|
||||||
TargetManager *retrieval.TargetManager
|
TargetManager *retrieval.TargetManager
|
||||||
RuleManager *rules.Manager
|
RuleManager *rules.Manager
|
||||||
|
@ -171,8 +171,8 @@ func New(logger log.Logger, o *Options) *Handler {
|
||||||
targetManager: o.TargetManager,
|
targetManager: o.TargetManager,
|
||||||
ruleManager: o.RuleManager,
|
ruleManager: o.RuleManager,
|
||||||
queryEngine: o.QueryEngine,
|
queryEngine: o.QueryEngine,
|
||||||
tsdb: o.Storage,
|
tsdb: o.TSDB,
|
||||||
storage: ptsdb.Adapter(o.Storage),
|
storage: o.Storage,
|
||||||
notifier: o.Notifier,
|
notifier: o.Notifier,
|
||||||
|
|
||||||
now: model.Now,
|
now: model.Now,
|
||||||
|
@ -213,7 +213,7 @@ func New(logger log.Logger, o *Options) *Handler {
|
||||||
router.Get("/targets", readyf(instrf("targets", h.targets)))
|
router.Get("/targets", readyf(instrf("targets", h.targets)))
|
||||||
router.Get("/version", readyf(instrf("version", h.version)))
|
router.Get("/version", readyf(instrf("version", h.version)))
|
||||||
|
|
||||||
router.Get("/heap", readyf(instrf("heap", h.dumpHeap)))
|
router.Get("/heap", instrf("heap", h.dumpHeap))
|
||||||
|
|
||||||
router.Get("/metrics", prometheus.Handler().ServeHTTP)
|
router.Get("/metrics", prometheus.Handler().ServeHTTP)
|
||||||
|
|
||||||
|
@ -223,24 +223,24 @@ func New(logger log.Logger, o *Options) *Handler {
|
||||||
|
|
||||||
router.Get("/consoles/*filepath", readyf(instrf("consoles", h.consoles)))
|
router.Get("/consoles/*filepath", readyf(instrf("consoles", h.consoles)))
|
||||||
|
|
||||||
router.Get("/static/*filepath", readyf(instrf("static", h.serveStaticAsset)))
|
router.Get("/static/*filepath", instrf("static", h.serveStaticAsset))
|
||||||
|
|
||||||
if o.UserAssetsPath != "" {
|
if o.UserAssetsPath != "" {
|
||||||
router.Get("/user/*filepath", readyf(instrf("user", route.FileServe(o.UserAssetsPath))))
|
router.Get("/user/*filepath", instrf("user", route.FileServe(o.UserAssetsPath)))
|
||||||
}
|
}
|
||||||
|
|
||||||
if o.EnableLifecycle {
|
if o.EnableLifecycle {
|
||||||
router.Post("/-/quit", h.quit)
|
router.Post("/-/quit", h.quit)
|
||||||
router.Post("/-/reload", h.reload)
|
router.Post("/-/reload", h.reload)
|
||||||
} else {
|
} else {
|
||||||
router.Post("/-/quit", readyf(func(w http.ResponseWriter, _ *http.Request) {
|
router.Post("/-/quit", func(w http.ResponseWriter, _ *http.Request) {
|
||||||
w.WriteHeader(http.StatusForbidden)
|
w.WriteHeader(http.StatusForbidden)
|
||||||
w.Write([]byte("Lifecycle APIs are not enabled"))
|
w.Write([]byte("Lifecycle APIs are not enabled"))
|
||||||
}))
|
})
|
||||||
router.Post("/-/reload", readyf(func(w http.ResponseWriter, _ *http.Request) {
|
router.Post("/-/reload", func(w http.ResponseWriter, _ *http.Request) {
|
||||||
w.WriteHeader(http.StatusForbidden)
|
w.WriteHeader(http.StatusForbidden)
|
||||||
w.Write([]byte("Lifecycle APIs are not enabled"))
|
w.Write([]byte("Lifecycle APIs are not enabled"))
|
||||||
}))
|
})
|
||||||
}
|
}
|
||||||
router.Get("/-/quit", func(w http.ResponseWriter, _ *http.Request) {
|
router.Get("/-/quit", func(w http.ResponseWriter, _ *http.Request) {
|
||||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
|
@ -251,8 +251,8 @@ func New(logger log.Logger, o *Options) *Handler {
|
||||||
w.Write([]byte("Only POST requests allowed"))
|
w.Write([]byte("Only POST requests allowed"))
|
||||||
})
|
})
|
||||||
|
|
||||||
router.Get("/debug/*subpath", readyf(serveDebug))
|
router.Get("/debug/*subpath", serveDebug)
|
||||||
router.Post("/debug/*subpath", readyf(serveDebug))
|
router.Post("/debug/*subpath", serveDebug)
|
||||||
|
|
||||||
router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
|
router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
@ -380,15 +380,9 @@ func (h *Handler) Run(ctx context.Context) error {
|
||||||
)
|
)
|
||||||
av2 := api_v2.New(
|
av2 := api_v2.New(
|
||||||
time.Now,
|
time.Now,
|
||||||
h.options.Storage,
|
h.options.TSDB,
|
||||||
h.options.QueryEngine,
|
h.options.QueryEngine,
|
||||||
func(mint, maxt int64) storage.Querier {
|
h.options.Storage.Querier,
|
||||||
q, err := ptsdb.Adapter(h.options.Storage).Querier(mint, maxt)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return q
|
|
||||||
},
|
|
||||||
func() []*retrieval.Target {
|
func() []*retrieval.Target {
|
||||||
return h.options.TargetManager.Targets()
|
return h.options.TargetManager.Targets()
|
||||||
},
|
},
|
||||||
|
|
|
@ -19,6 +19,8 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/storage/tsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGlobalURL(t *testing.T) {
|
func TestGlobalURL(t *testing.T) {
|
||||||
|
@ -78,7 +80,7 @@ func TestReadyAndHealthy(t *testing.T) {
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
MaxConnections: 512,
|
MaxConnections: 512,
|
||||||
Context: nil,
|
Context: nil,
|
||||||
Storage: nil,
|
Storage: &tsdb.ReadyStorage{},
|
||||||
QueryEngine: nil,
|
QueryEngine: nil,
|
||||||
TargetManager: nil,
|
TargetManager: nil,
|
||||||
RuleManager: nil,
|
RuleManager: nil,
|
||||||
|
@ -155,7 +157,7 @@ func TestRoutePrefix(t *testing.T) {
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
MaxConnections: 512,
|
MaxConnections: 512,
|
||||||
Context: nil,
|
Context: nil,
|
||||||
Storage: nil,
|
Storage: &tsdb.ReadyStorage{},
|
||||||
QueryEngine: nil,
|
QueryEngine: nil,
|
||||||
TargetManager: nil,
|
TargetManager: nil,
|
||||||
RuleManager: nil,
|
RuleManager: nil,
|
||||||
|
|
Loading…
Reference in a new issue