Make remote flush deadline a command line param.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
Tom Wilkie 2018-05-23 15:03:54 +01:00
parent a6c353613a
commit e51d6c4b6c
5 changed files with 34 additions and 25 deletions

View file

@ -92,6 +92,7 @@ func main() {
webTimeout model.Duration webTimeout model.Duration
queryTimeout model.Duration queryTimeout model.Duration
queryConcurrency int queryConcurrency int
RemoteFlushDeadline model.Duration
prometheusURL string prometheusURL string
@ -160,6 +161,9 @@ func main() {
a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory."). a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.tsdb.NoLockfile) Default("false").BoolVar(&cfg.tsdb.NoLockfile)
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default(1 * time.Minute).PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
Default("10000").IntVar(&cfg.notifier.QueueCapacity) Default("10000").IntVar(&cfg.notifier.QueueCapacity)
@ -222,7 +226,7 @@ func main() {
var ( var (
localStorage = &tsdb.ReadyStorage{} localStorage = &tsdb.ReadyStorage{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime) remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime, cfg.RemoteFlushDeadline)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
) )

View file

@ -122,8 +122,6 @@ var (
MaxRetries: 10, MaxRetries: 10,
MinBackoff: 30 * time.Millisecond, MinBackoff: 30 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond, MaxBackoff: 100 * time.Millisecond,
FlushDeadline: 1 * time.Minute,
} }
// DefaultRemoteReadConfig is the default remote read configuration. // DefaultRemoteReadConfig is the default remote read configuration.
@ -648,10 +646,6 @@ type QueueConfig struct {
// On recoverable errors, backoff exponentially. // On recoverable errors, backoff exponentially.
MinBackoff time.Duration `yaml:"min_backoff,omitempty"` MinBackoff time.Duration `yaml:"min_backoff,omitempty"`
MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` MaxBackoff time.Duration `yaml:"max_backoff,omitempty"`
// On shutdown or config reload allow the following duration for flushing
// pending samples, otherwise continue without waiting.
FlushDeadline time.Duration `yaml:"flush_deadline"`
} }
// RemoteReadConfig is the configuration for reading from remote storage. // RemoteReadConfig is the configuration for reading from remote storage.

View file

@ -140,6 +140,7 @@ type StorageClient interface {
type QueueManager struct { type QueueManager struct {
logger log.Logger logger log.Logger
flushDeadline time.Duration
cfg config.QueueConfig cfg config.QueueConfig
externalLabels model.LabelSet externalLabels model.LabelSet
relabelConfigs []*config.RelabelConfig relabelConfigs []*config.RelabelConfig
@ -159,12 +160,13 @@ type QueueManager struct {
} }
// NewQueueManager builds a new QueueManager. // NewQueueManager builds a new QueueManager.
func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient, flushDeadline time.Duration) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
t := &QueueManager{ t := &QueueManager{
logger: logger, logger: logger,
flushDeadline: flushDeadline,
cfg: cfg, cfg: cfg,
externalLabels: externalLabels, externalLabels: externalLabels,
relabelConfigs: relabelConfigs, relabelConfigs: relabelConfigs,
@ -256,7 +258,7 @@ func (t *QueueManager) Stop() {
t.shardsMtx.Lock() t.shardsMtx.Lock()
defer t.shardsMtx.Unlock() defer t.shardsMtx.Unlock()
t.shards.stop(t.cfg.FlushDeadline) t.shards.stop(t.flushDeadline)
level.Info(t.logger).Log("msg", "Remote storage stopped.") level.Info(t.logger).Log("msg", "Remote storage stopped.")
} }
@ -361,7 +363,7 @@ func (t *QueueManager) reshard(n int) {
t.shards = newShards t.shards = newShards
t.shardsMtx.Unlock() t.shardsMtx.Unlock()
oldShards.stop(t.cfg.FlushDeadline) oldShards.stop(t.flushDeadline)
// We start the newShards after we have stopped (the therefore completely // We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in // flushed) the oldShards, to guarantee we only every deliver samples in

View file

@ -26,6 +26,8 @@ import (
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
) )
const defaultFlushDeadline = 1 * time.Minute
type TestStorageClient struct { type TestStorageClient struct {
receivedSamples map[string][]*prompb.Sample receivedSamples map[string][]*prompb.Sample
expectedSamples map[string][]*prompb.Sample expectedSamples map[string][]*prompb.Sample
@ -109,7 +111,7 @@ func TestSampleDelivery(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
m := NewQueueManager(nil, cfg, nil, nil, c) m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline)
// These should be received by the client. // These should be received by the client.
for _, s := range samples[:len(samples)/2] { for _, s := range samples[:len(samples)/2] {
@ -145,7 +147,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
cfg.BatchSendDeadline = 100 * time.Millisecond cfg.BatchSendDeadline = 100 * time.Millisecond
m := NewQueueManager(nil, cfg, nil, nil, c) m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -181,7 +183,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient() c := NewTestStorageClient()
c.expectSamples(samples) c.expectSamples(samples)
m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c) m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
// These should be received by the client. // These should be received by the client.
for _, s := range samples { for _, s := range samples {
@ -259,7 +261,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
cfg.Capacity = n cfg.Capacity = n
m := NewQueueManager(nil, cfg, nil, nil, c) m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()

View file

@ -16,6 +16,7 @@ package remote
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -39,14 +40,19 @@ type Storage struct {
// For reads // For reads
queryables []storage.Queryable queryables []storage.Queryable
localStartTimeCallback startTimeCallback localStartTimeCallback startTimeCallback
flushDeadline time.Duration
} }
// NewStorage returns a remote.Storage. // NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, stCallback startTimeCallback) *Storage { func NewStorage(l log.Logger, stCallback startTimeCallback, flushDeadline time.Duration) *Storage {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
return &Storage{logger: l, localStartTimeCallback: stCallback} return &Storage{
logger: l,
localStartTimeCallback: stCallback,
flushDeadline: flushDeadline,
}
} }
// ApplyConfig updates the state as the new config requires. // ApplyConfig updates the state as the new config requires.
@ -74,6 +80,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
conf.GlobalConfig.ExternalLabels, conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs, rwConf.WriteRelabelConfigs,
c, c,
s.flushDeadline,
)) ))
} }