Moved readyStorage to main.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-02-17 11:41:04 +00:00
parent 8a775bc468
commit a20bebf7eb
3 changed files with 90 additions and 89 deletions

View file

@ -17,6 +17,7 @@ package main
import (
"context"
"fmt"
"math"
"net"
"net/http"
_ "net/http/pprof" // Comment this line to disable pprof endpoint.
@ -49,6 +50,7 @@ import (
"github.com/prometheus/prometheus/discovery"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/logging"
"github.com/prometheus/prometheus/pkg/relabel"
prom_runtime "github.com/prometheus/prometheus/pkg/runtime"
@ -335,7 +337,7 @@ func main() {
level.Info(logger).Log("vm_limits", prom_runtime.VmLimits())
var (
localStorage = &tsdb.ReadyStorage{}
localStorage = &readyStorage{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)
@ -890,3 +892,88 @@ func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
}
}
}
// readyStorage implements the Storage interface while allowing to set the actual
// storage at a later point in time.
type readyStorage struct {
mtx sync.RWMutex
db *tsdb.DB
startTimeMargin int64
}
// Set the storage.
func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.db = db
s.startTimeMargin = startTimeMargin
}
// Get the storage.
func (s *readyStorage) Get() *tsdb.DB {
if x := s.get(); x != nil {
return x
}
return nil
}
func (s *readyStorage) get() *tsdb.DB {
s.mtx.RLock()
x := s.db
s.mtx.RUnlock()
return x
}
// StartTime implements the Storage interface.
func (s *readyStorage) StartTime() (int64, error) {
if x := s.get(); x != nil {
var startTime int64
if len(x.Blocks()) > 0 {
startTime = x.Blocks()[0].Meta().MinTime
} else {
startTime = time.Now().Unix() * 1000
}
// Add a safety margin as it may take a few minutes for everything to spin up.
return startTime + s.startTimeMargin, nil
}
return math.MaxInt64, tsdb.ErrNotReady
}
// Querier implements the Storage interface.
func (s *readyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
if x := s.get(); x != nil {
return x.Querier(ctx, mint, maxt)
}
return nil, tsdb.ErrNotReady
}
// Appender implements the Storage interface.
func (s *readyStorage) Appender() storage.Appender {
if x := s.get(); x != nil {
return x.Appender()
}
return notReadyAppender{}
}
type notReadyAppender struct{}
func (n notReadyAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AddFast(ref uint64, t int64, v float64) error { return tsdb.ErrNotReady }
func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }
func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
// Close implements the Storage interface.
func (s *readyStorage) Close() error {
if x := s.Get(); x != nil {
return x.Close()
}
return nil
}

View file

@ -1455,88 +1455,3 @@ func exponential(d, min, max time.Duration) time.Duration {
}
return d
}
// ReadyStorage implements the Storage interface while allowing to set the actual
// storage at a later point in time.
type ReadyStorage struct {
mtx sync.RWMutex
db *DB
startTimeMargin int64
}
// Set the storage.
func (s *ReadyStorage) Set(db *DB, startTimeMargin int64) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.db = db
s.startTimeMargin = startTimeMargin
}
// Get the storage.
func (s *ReadyStorage) Get() *DB {
if x := s.get(); x != nil {
return x
}
return nil
}
func (s *ReadyStorage) get() *DB {
s.mtx.RLock()
x := s.db
s.mtx.RUnlock()
return x
}
// StartTime implements the Storage interface.
func (s *ReadyStorage) StartTime() (int64, error) {
if x := s.get(); x != nil {
var startTime int64
if len(x.Blocks()) > 0 {
startTime = x.Blocks()[0].Meta().MinTime
} else {
startTime = time.Now().Unix() * 1000
}
// Add a safety margin as it may take a few minutes for everything to spin up.
return startTime + s.startTimeMargin, nil
}
return math.MaxInt64, ErrNotReady
}
// Querier implements the Storage interface.
func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
if x := s.get(); x != nil {
return x.Querier(ctx, mint, maxt)
}
return nil, ErrNotReady
}
// Appender implements the Storage interface.
func (s *ReadyStorage) Appender() storage.Appender {
if x := s.get(); x != nil {
return x.Appender()
}
return notReadyAppender{}
}
type notReadyAppender struct{}
func (n notReadyAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
return 0, ErrNotReady
}
func (n notReadyAppender) AddFast(ref uint64, t int64, v float64) error { return ErrNotReady }
func (n notReadyAppender) Commit() error { return ErrNotReady }
func (n notReadyAppender) Rollback() error { return ErrNotReady }
// Close implements the Storage interface.
func (s *ReadyStorage) Close() error {
if x := s.Get(); x != nil {
return x.Close()
}
return nil
}

View file

@ -28,7 +28,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/rules"
@ -106,7 +105,7 @@ func TestReadyAndHealthy(t *testing.T) {
ReadTimeout: 30 * time.Second,
MaxConnections: 512,
Context: nil,
Storage: &tsdb.ReadyStorage{},
Storage: nil,
QueryEngine: nil,
ScrapeManager: &scrape.Manager{},
RuleManager: &rules.Manager{},
@ -297,7 +296,7 @@ func TestRoutePrefix(t *testing.T) {
ReadTimeout: 30 * time.Second,
MaxConnections: 512,
Context: nil,
Storage: &tsdb.ReadyStorage{},
Storage: nil,
QueryEngine: nil,
ScrapeManager: nil,
RuleManager: nil,