From f3c61f8bb21e239c5fde4c07c6d018cc712ae20f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 31 Jan 2018 15:41:48 +0000 Subject: [PATCH 1/7] Only give remote queues 1 minute to flush samples on shutdown. Signed-off-by: Tom Wilkie --- storage/remote/queue_manager.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 41e001c066..fb931c2f25 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -46,6 +46,9 @@ const ( // Limit to 1 log event every 10s logRateLimit = 0.1 logBurst = 10 + + // Allow 1 minute to flush samples`, then exit anyway. + stopFlushDeadline = 1 * time.Minute ) var ( @@ -403,7 +406,18 @@ func (s *shards) stop() { for _, shard := range s.queues { close(shard) } - s.wg.Wait() + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(stopFlushDeadline): + level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") + } } func (s *shards) enqueue(sample *model.Sample) bool { From aa17263edd0db232e63c7b0a4c4fda93147d6a65 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 1 Feb 2018 13:20:38 +0000 Subject: [PATCH 2/7] Remove WaitGroup and extra goroutine. Signed-off-by: Tom Wilkie --- storage/remote/queue_manager.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index fb931c2f25..abe0caf7cd 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -16,6 +16,7 @@ package remote import ( "math" "sync" + "sync/atomic" "time" "golang.org/x/time/rate" @@ -372,10 +373,10 @@ func (t *QueueManager) reshard(n int) { } type shards struct { - qm *QueueManager - queues []chan *model.Sample - done chan struct{} - wg sync.WaitGroup + qm *QueueManager + queues []chan *model.Sample + done chan struct{} + running int32 } func (t *QueueManager) newShards(numShards int) *shards { @@ -384,11 +385,11 @@ func (t *QueueManager) newShards(numShards int) *shards { queues[i] = make(chan *model.Sample, t.cfg.Capacity) } s := &shards{ - qm: t, - queues: queues, - done: make(chan struct{}), + qm: t, + queues: queues, + done: make(chan struct{}), + running: int32(numShards), } - s.wg.Add(numShards) return s } @@ -407,14 +408,8 @@ func (s *shards) stop() { close(shard) } - done := make(chan struct{}) - go func() { - s.wg.Wait() - close(done) - }() - select { - case <-done: + case <-s.done: case <-time.After(stopFlushDeadline): level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } @@ -435,7 +430,12 @@ func (s *shards) enqueue(sample *model.Sample) bool { } func (s *shards) runShard(i int) { - defer s.wg.Done() + defer func() { + if atomic.AddInt32(&s.running, -1) == 0 { + close(s.done) + } + }() + queue := s.queues[i] // Send batches of at most MaxSamplesPerSend samples to the remote storage. From a6c353613a90230a5f7ae8ebc358373e3f09ef02 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 1 Feb 2018 13:25:15 +0000 Subject: [PATCH 3/7] Make the flush deadline configurable. Signed-off-by: Tom Wilkie --- config/config.go | 6 ++++++ storage/remote/queue_manager.go | 11 ++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index 746aae3bf7..05554d2e23 100644 --- a/config/config.go +++ b/config/config.go @@ -122,6 +122,8 @@ var ( MaxRetries: 10, MinBackoff: 30 * time.Millisecond, MaxBackoff: 100 * time.Millisecond, + + FlushDeadline: 1 * time.Minute, } // DefaultRemoteReadConfig is the default remote read configuration. @@ -646,6 +648,10 @@ type QueueConfig struct { // On recoverable errors, backoff exponentially. MinBackoff time.Duration `yaml:"min_backoff,omitempty"` MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` + + // On shutdown or config reload allow the following duration for flushing + // pending samples, otherwise continue without waiting. + FlushDeadline time.Duration `yaml:"flush_deadline"` } // RemoteReadConfig is the configuration for reading from remote storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index abe0caf7cd..c0e3df6d21 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -47,9 +47,6 @@ const ( // Limit to 1 log event every 10s logRateLimit = 0.1 logBurst = 10 - - // Allow 1 minute to flush samples`, then exit anyway. - stopFlushDeadline = 1 * time.Minute ) var ( @@ -259,7 +256,7 @@ func (t *QueueManager) Stop() { t.shardsMtx.Lock() defer t.shardsMtx.Unlock() - t.shards.stop() + t.shards.stop(t.cfg.FlushDeadline) level.Info(t.logger).Log("msg", "Remote storage stopped.") } @@ -364,7 +361,7 @@ func (t *QueueManager) reshard(n int) { t.shards = newShards t.shardsMtx.Unlock() - oldShards.stop() + oldShards.stop(t.cfg.FlushDeadline) // We start the newShards after we have stopped (the therefore completely // flushed) the oldShards, to guarantee we only every deliver samples in @@ -403,14 +400,14 @@ func (s *shards) start() { } } -func (s *shards) stop() { +func (s *shards) stop(deadline time.Duration) { for _, shard := range s.queues { close(shard) } select { case <-s.done: - case <-time.After(stopFlushDeadline): + case <-time.After(deadline): level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } } From e51d6c4b6ca5aa45e61b03941a727ef1e1919146 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 23 May 2018 15:03:54 +0100 Subject: [PATCH 4/7] Make remote flush deadline a command line param. Signed-off-by: Tom Wilkie --- cmd/prometheus/main.go | 24 ++++++++++++++---------- config/config.go | 6 ------ storage/remote/queue_manager.go | 8 +++++--- storage/remote/queue_manager_test.go | 10 ++++++---- storage/remote/storage.go | 11 +++++++++-- 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 93898863a6..b411a7342e 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -83,15 +83,16 @@ func main() { cfg := struct { configFile string - localStoragePath string - notifier notifier.Options - notifierTimeout model.Duration - web web.Options - tsdb tsdb.Options - lookbackDelta model.Duration - webTimeout model.Duration - queryTimeout model.Duration - queryConcurrency int + localStoragePath string + notifier notifier.Options + notifierTimeout model.Duration + web web.Options + tsdb tsdb.Options + lookbackDelta model.Duration + webTimeout model.Duration + queryTimeout model.Duration + queryConcurrency int + RemoteFlushDeadline model.Duration prometheusURL string @@ -160,6 +161,9 @@ func main() { a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory."). Default("false").BoolVar(&cfg.tsdb.NoLockfile) + a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). + Default(1 * time.Minute).PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) + a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default("10000").IntVar(&cfg.notifier.QueueCapacity) @@ -222,7 +226,7 @@ func main() { var ( localStorage = &tsdb.ReadyStorage{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime, cfg.RemoteFlushDeadline) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) diff --git a/config/config.go b/config/config.go index 05554d2e23..746aae3bf7 100644 --- a/config/config.go +++ b/config/config.go @@ -122,8 +122,6 @@ var ( MaxRetries: 10, MinBackoff: 30 * time.Millisecond, MaxBackoff: 100 * time.Millisecond, - - FlushDeadline: 1 * time.Minute, } // DefaultRemoteReadConfig is the default remote read configuration. @@ -648,10 +646,6 @@ type QueueConfig struct { // On recoverable errors, backoff exponentially. MinBackoff time.Duration `yaml:"min_backoff,omitempty"` MaxBackoff time.Duration `yaml:"max_backoff,omitempty"` - - // On shutdown or config reload allow the following duration for flushing - // pending samples, otherwise continue without waiting. - FlushDeadline time.Duration `yaml:"flush_deadline"` } // RemoteReadConfig is the configuration for reading from remote storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index c0e3df6d21..19ee53afcc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -140,6 +140,7 @@ type StorageClient interface { type QueueManager struct { logger log.Logger + flushDeadline time.Duration cfg config.QueueConfig externalLabels model.LabelSet relabelConfigs []*config.RelabelConfig @@ -159,12 +160,13 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { +func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient, flushDeadline time.Duration) *QueueManager { if logger == nil { logger = log.NewNopLogger() } t := &QueueManager{ logger: logger, + flushDeadline: flushDeadline, cfg: cfg, externalLabels: externalLabels, relabelConfigs: relabelConfigs, @@ -256,7 +258,7 @@ func (t *QueueManager) Stop() { t.shardsMtx.Lock() defer t.shardsMtx.Unlock() - t.shards.stop(t.cfg.FlushDeadline) + t.shards.stop(t.flushDeadline) level.Info(t.logger).Log("msg", "Remote storage stopped.") } @@ -361,7 +363,7 @@ func (t *QueueManager) reshard(n int) { t.shards = newShards t.shardsMtx.Unlock() - oldShards.stop(t.cfg.FlushDeadline) + oldShards.stop(t.flushDeadline) // We start the newShards after we have stopped (the therefore completely // flushed) the oldShards, to guarantee we only every deliver samples in diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e2a5b19cb5..b2b2804d38 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -26,6 +26,8 @@ import ( "github.com/prometheus/prometheus/prompb" ) +const defaultFlushDeadline = 1 * time.Minute + type TestStorageClient struct { receivedSamples map[string][]*prompb.Sample expectedSamples map[string][]*prompb.Sample @@ -109,7 +111,7 @@ func TestSampleDelivery(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - m := NewQueueManager(nil, cfg, nil, nil, c) + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) // These should be received by the client. for _, s := range samples[:len(samples)/2] { @@ -145,7 +147,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 cfg.BatchSendDeadline = 100 * time.Millisecond - m := NewQueueManager(nil, cfg, nil, nil, c) + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) m.Start() defer m.Stop() @@ -181,7 +183,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c) + m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) // These should be received by the client. for _, s := range samples { @@ -259,7 +261,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { cfg := config.DefaultQueueConfig cfg.MaxShards = 1 cfg.Capacity = n - m := NewQueueManager(nil, cfg, nil, nil, c) + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) m.Start() diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 5192126eac..820d0ffa42 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -16,6 +16,7 @@ package remote import ( "context" "sync" + "time" "github.com/go-kit/kit/log" "github.com/prometheus/common/model" @@ -39,14 +40,19 @@ type Storage struct { // For reads queryables []storage.Queryable localStartTimeCallback startTimeCallback + flushDeadline time.Duration } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, stCallback startTimeCallback) *Storage { +func NewStorage(l log.Logger, stCallback startTimeCallback, flushDeadline time.Duration) *Storage { if l == nil { l = log.NewNopLogger() } - return &Storage{logger: l, localStartTimeCallback: stCallback} + return &Storage{ + logger: l, + localStartTimeCallback: stCallback, + flushDeadline: flushDeadline, + } } // ApplyConfig updates the state as the new config requires. @@ -74,6 +80,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c, + s.flushDeadline, )) } From 8acad5f3cd18faf15b0b24fab09b784af8e2a722 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 24 May 2018 15:40:24 +0100 Subject: [PATCH 5/7] make it compile Signed-off-by: Tom Wilkie --- cmd/prometheus/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b411a7342e..1188a7bbe5 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -162,7 +162,7 @@ func main() { Default("false").BoolVar(&cfg.tsdb.NoLockfile) a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload."). - Default(1 * time.Minute).PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) + Default("1m").PlaceHolder("").SetValue(&cfg.RemoteFlushDeadline) a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default("10000").IntVar(&cfg.notifier.QueueCapacity) @@ -226,7 +226,7 @@ func main() { var ( localStorage = &tsdb.ReadyStorage{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime, cfg.RemoteFlushDeadline) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime, time.Duration(cfg.RemoteFlushDeadline)) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) From 3353bbd01871020643a5c0608303eba36592b7d2 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 29 May 2018 09:51:29 +0100 Subject: [PATCH 6/7] Add proper unclean shutdown handling with a cancellable context. Signed-off-by: Tom Wilkie --- storage/remote/client.go | 3 ++- storage/remote/client_test.go | 3 ++- storage/remote/queue_manager.go | 21 +++++++++++++++--- storage/remote/queue_manager_test.go | 33 +++++++++++++++++++++++++--- 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index adfb9910d9..ae42e99235 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -69,7 +69,7 @@ type recoverableError struct { } // Store sends a batch of samples to the HTTP endpoint. -func (c *Client) Store(req *prompb.WriteRequest) error { +func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error { data, err := proto.Marshal(req) if err != nil { return err @@ -85,6 +85,7 @@ func (c *Client) Store(req *prompb.WriteRequest) error { httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + httpReq = httpReq.WithContext(ctx) ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index b0b93aad84..31f23cbb85 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -14,6 +14,7 @@ package remote import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -73,7 +74,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { t.Fatal(err) } - err = c.Store(&prompb.WriteRequest{}) + err = c.Store(context.TODO(), &prompb.WriteRequest{}) if !reflect.DeepEqual(err, test.err) { t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 19ee53afcc..780a3689a7 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -14,6 +14,7 @@ package remote import ( + "context" "math" "sync" "sync/atomic" @@ -130,7 +131,7 @@ func init() { // external timeseries database. type StorageClient interface { // Store stores the given samples in the remote storage. - Store(*prompb.WriteRequest) error + Store(context.Context, *prompb.WriteRequest) error // Name identifies the remote storage implementation. Name() string } @@ -376,6 +377,8 @@ type shards struct { queues []chan *model.Sample done chan struct{} running int32 + ctx context.Context + cancel context.CancelFunc } func (t *QueueManager) newShards(numShards int) *shards { @@ -383,11 +386,14 @@ func (t *QueueManager) newShards(numShards int) *shards { for i := 0; i < numShards; i++ { queues[i] = make(chan *model.Sample, t.cfg.Capacity) } + ctx, cancel := context.WithCancel(context.Background()) s := &shards{ qm: t, queues: queues, done: make(chan struct{}), running: int32(numShards), + ctx: ctx, + cancel: cancel, } return s } @@ -403,15 +409,21 @@ func (s *shards) start() { } func (s *shards) stop(deadline time.Duration) { + // Attempt a clean shutdown. for _, shard := range s.queues { close(shard) } - select { case <-s.done: + return case <-time.After(deadline): level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } + + // Force a unclean shutdown. + s.cancel() + <-s.done + return } func (s *shards) enqueue(sample *model.Sample) bool { @@ -455,6 +467,9 @@ func (s *shards) runShard(i int) { for { select { + case <-s.ctx.Done(): + return + case sample, ok := <-queue: if !ok { if len(pendingSamples) > 0 { @@ -502,7 +517,7 @@ func (s *shards) sendSamplesWithBackoff(samples model.Samples) { for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { begin := time.Now() req := ToWriteRequest(samples) - err := s.qm.client.Store(req) + err := s.qm.client.Store(s.ctx, req) sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) if err == nil { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index b2b2804d38..82169899fc 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -14,6 +14,7 @@ package remote import ( + "context" "fmt" "reflect" "sync" @@ -71,7 +72,7 @@ func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { } } -func (c *TestStorageClient) Store(req *prompb.WriteRequest) error { +func (c *TestStorageClient) Store(_ context.Context, req *prompb.WriteRequest) error { c.mtx.Lock() defer c.mtx.Unlock() count := 0 @@ -211,9 +212,12 @@ func NewTestBlockedStorageClient() *TestBlockingStorageClient { } } -func (c *TestBlockingStorageClient) Store(_ *prompb.WriteRequest) error { +func (c *TestBlockingStorageClient) Store(ctx context.Context, _ *prompb.WriteRequest) error { atomic.AddUint64(&c.numCalls, 1) - <-c.block + select { + case <-c.block: + case <-ctx.Done(): + } return nil } @@ -301,3 +305,26 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { t.Errorf("Saw %d concurrent sends, expected 1", numCalls) } } + +func TestShutdown(t *testing.T) { + deadline := 10 * time.Second + c := NewTestBlockedStorageClient() + m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, deadline) + for i := 0; i < config.DefaultQueueConfig.MaxSamplesPerSend; i++ { + m.Append(&model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: model.LabelValue(fmt.Sprintf("test_metric_%d", i)), + }, + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }) + } + m.Start() + + start := time.Now() + m.Stop() + duration := time.Now().Sub(start) + if duration > deadline+(deadline/10) { + t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) + } +} From b58199bf1264307e020a2ab95f0be75ec82e0d45 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 29 May 2018 11:35:43 +0100 Subject: [PATCH 7/7] Review feedback. Signed-off-by: Tom Wilkie --- storage/remote/client_test.go | 2 +- storage/remote/queue_manager.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 31f23cbb85..73ec875a5d 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -74,7 +74,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { t.Fatal(err) } - err = c.Store(context.TODO(), &prompb.WriteRequest{}) + err = c.Store(context.Background(), &prompb.WriteRequest{}) if !reflect.DeepEqual(err, test.err) { t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 780a3689a7..7408c1f596 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -420,7 +420,7 @@ func (s *shards) stop(deadline time.Duration) { level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } - // Force a unclean shutdown. + // Force an unclean shutdown. s.cancel() <-s.done return