Merge pull request #4187 from prometheus/2972-remote-block-shutdown

Only give remote queues 1 minute to flush samples on shutdown.
This commit is contained in:
Tom Wilkie 2018-05-29 14:25:19 +01:00 committed by GitHub
commit 12a1a22a5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 107 additions and 37 deletions

View file

@ -83,15 +83,16 @@ func main() {
cfg := struct { cfg := struct {
configFile string configFile string
localStoragePath string localStoragePath string
notifier notifier.Options notifier notifier.Options
notifierTimeout model.Duration notifierTimeout model.Duration
web web.Options web web.Options
tsdb tsdb.Options tsdb tsdb.Options
lookbackDelta model.Duration lookbackDelta model.Duration
webTimeout model.Duration webTimeout model.Duration
queryTimeout model.Duration queryTimeout model.Duration
queryConcurrency int queryConcurrency int
RemoteFlushDeadline model.Duration
prometheusURL string prometheusURL string
@ -160,6 +161,9 @@ func main() {
a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory."). a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.tsdb.NoLockfile) Default("false").BoolVar(&cfg.tsdb.NoLockfile)
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
Default("10000").IntVar(&cfg.notifier.QueueCapacity) Default("10000").IntVar(&cfg.notifier.QueueCapacity)
@ -222,7 +226,7 @@ func main() {
var ( var (
localStorage = &tsdb.ReadyStorage{} localStorage = &tsdb.ReadyStorage{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime) remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime, time.Duration(cfg.RemoteFlushDeadline))
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
) )

View file

@ -69,7 +69,7 @@ type recoverableError struct {
} }
// Store sends a batch of samples to the HTTP endpoint. // Store sends a batch of samples to the HTTP endpoint.
func (c *Client) Store(req *prompb.WriteRequest) error { func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error {
data, err := proto.Marshal(req) data, err := proto.Marshal(req)
if err != nil { if err != nil {
return err return err
@ -85,6 +85,7 @@ func (c *Client) Store(req *prompb.WriteRequest) error {
httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
httpReq = httpReq.WithContext(ctx)
ctx, cancel := context.WithTimeout(context.Background(), c.timeout) ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel() defer cancel()

View file

@ -14,6 +14,7 @@
package remote package remote
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -73,7 +74,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = c.Store(&prompb.WriteRequest{}) err = c.Store(context.Background(), &prompb.WriteRequest{})
if !reflect.DeepEqual(err, test.err) { if !reflect.DeepEqual(err, test.err) {
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err)
} }

View file

@ -14,8 +14,10 @@
package remote package remote
import ( import (
"context"
"math" "math"
"sync" "sync"
"sync/atomic"
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -129,7 +131,7 @@ func init() {
// external timeseries database. // external timeseries database.
type StorageClient interface { type StorageClient interface {
// Store stores the given samples in the remote storage. // Store stores the given samples in the remote storage.
Store(*prompb.WriteRequest) error Store(context.Context, *prompb.WriteRequest) error
// Name identifies the remote storage implementation. // Name identifies the remote storage implementation.
Name() string Name() string
} }
@ -139,6 +141,7 @@ type StorageClient interface {
type QueueManager struct { type QueueManager struct {
logger log.Logger logger log.Logger
flushDeadline time.Duration
cfg config.QueueConfig cfg config.QueueConfig
externalLabels model.LabelSet externalLabels model.LabelSet
relabelConfigs []*config.RelabelConfig relabelConfigs []*config.RelabelConfig
@ -158,12 +161,13 @@ type QueueManager struct {
} }
// NewQueueManager builds a new QueueManager. // NewQueueManager builds a new QueueManager.
func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient, flushDeadline time.Duration) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
t := &QueueManager{ t := &QueueManager{
logger: logger, logger: logger,
flushDeadline: flushDeadline,
cfg: cfg, cfg: cfg,
externalLabels: externalLabels, externalLabels: externalLabels,
relabelConfigs: relabelConfigs, relabelConfigs: relabelConfigs,
@ -255,7 +259,7 @@ func (t *QueueManager) Stop() {
t.shardsMtx.Lock() t.shardsMtx.Lock()
defer t.shardsMtx.Unlock() defer t.shardsMtx.Unlock()
t.shards.stop() t.shards.stop(t.flushDeadline)
level.Info(t.logger).Log("msg", "Remote storage stopped.") level.Info(t.logger).Log("msg", "Remote storage stopped.")
} }
@ -360,7 +364,7 @@ func (t *QueueManager) reshard(n int) {
t.shards = newShards t.shards = newShards
t.shardsMtx.Unlock() t.shardsMtx.Unlock()
oldShards.stop() oldShards.stop(t.flushDeadline)
// We start the newShards after we have stopped (the therefore completely // We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in // flushed) the oldShards, to guarantee we only every deliver samples in
@ -369,10 +373,12 @@ func (t *QueueManager) reshard(n int) {
} }
type shards struct { type shards struct {
qm *QueueManager qm *QueueManager
queues []chan *model.Sample queues []chan *model.Sample
done chan struct{} done chan struct{}
wg sync.WaitGroup running int32
ctx context.Context
cancel context.CancelFunc
} }
func (t *QueueManager) newShards(numShards int) *shards { func (t *QueueManager) newShards(numShards int) *shards {
@ -380,12 +386,15 @@ func (t *QueueManager) newShards(numShards int) *shards {
for i := 0; i < numShards; i++ { for i := 0; i < numShards; i++ {
queues[i] = make(chan *model.Sample, t.cfg.Capacity) queues[i] = make(chan *model.Sample, t.cfg.Capacity)
} }
ctx, cancel := context.WithCancel(context.Background())
s := &shards{ s := &shards{
qm: t, qm: t,
queues: queues, queues: queues,
done: make(chan struct{}), done: make(chan struct{}),
running: int32(numShards),
ctx: ctx,
cancel: cancel,
} }
s.wg.Add(numShards)
return s return s
} }
@ -399,11 +408,22 @@ func (s *shards) start() {
} }
} }
func (s *shards) stop() { func (s *shards) stop(deadline time.Duration) {
// Attempt a clean shutdown.
for _, shard := range s.queues { for _, shard := range s.queues {
close(shard) close(shard)
} }
s.wg.Wait() select {
case <-s.done:
return
case <-time.After(deadline):
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown")
}
// Force an unclean shutdown.
s.cancel()
<-s.done
return
} }
func (s *shards) enqueue(sample *model.Sample) bool { func (s *shards) enqueue(sample *model.Sample) bool {
@ -421,7 +441,12 @@ func (s *shards) enqueue(sample *model.Sample) bool {
} }
func (s *shards) runShard(i int) { func (s *shards) runShard(i int) {
defer s.wg.Done() defer func() {
if atomic.AddInt32(&s.running, -1) == 0 {
close(s.done)
}
}()
queue := s.queues[i] queue := s.queues[i]
// Send batches of at most MaxSamplesPerSend samples to the remote storage. // Send batches of at most MaxSamplesPerSend samples to the remote storage.
@ -442,6 +467,9 @@ func (s *shards) runShard(i int) {
for { for {
select { select {
case <-s.ctx.Done():
return
case sample, ok := <-queue: case sample, ok := <-queue:
if !ok { if !ok {
if len(pendingSamples) > 0 { if len(pendingSamples) > 0 {
@ -489,7 +517,7 @@ func (s *shards) sendSamplesWithBackoff(samples model.Samples) {
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- {
begin := time.Now() begin := time.Now()
req := ToWriteRequest(samples) req := ToWriteRequest(samples)
err := s.qm.client.Store(req) err := s.qm.client.Store(s.ctx, req)
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())
if err == nil { if err == nil {

View file

@ -14,6 +14,7 @@
package remote package remote
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
@ -26,6 +27,8 @@ import (
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
) )
const defaultFlushDeadline = 1 * time.Minute
type TestStorageClient struct { type TestStorageClient struct {
receivedSamples map[string][]*prompb.Sample receivedSamples map[string][]*prompb.Sample
expectedSamples map[string][]*prompb.Sample expectedSamples map[string][]*prompb.Sample
@ -69,7 +72,7 @@ func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
} }
} }
func (c *TestStorageClient) Store(req *prompb.WriteRequest) error { func (c *TestStorageClient) Store(_ context.Context, req *prompb.WriteRequest) error {
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
count := 0 count := 0
@ -109,7 +112,7 @@ func TestSampleDelivery(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
m := NewQueueManager(nil, cfg, nil, nil, c) m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline)
// These should be received by the client. // These should be received by the client.
for _, s := range samples[:len(samples)/2] { for _, s := range samples[:len(samples)/2] {
@ -145,7 +148,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
cfg.BatchSendDeadline = 100 * time.Millisecond cfg.BatchSendDeadline = 100 * time.Millisecond
m := NewQueueManager(nil, cfg, nil, nil, c) m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -181,7 +184,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient() c := NewTestStorageClient()
c.expectSamples(samples) c.expectSamples(samples)
m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c) m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
// These should be received by the client. // These should be received by the client.
for _, s := range samples { for _, s := range samples {
@ -209,9 +212,12 @@ func NewTestBlockedStorageClient() *TestBlockingStorageClient {
} }
} }
func (c *TestBlockingStorageClient) Store(_ *prompb.WriteRequest) error { func (c *TestBlockingStorageClient) Store(ctx context.Context, _ *prompb.WriteRequest) error {
atomic.AddUint64(&c.numCalls, 1) atomic.AddUint64(&c.numCalls, 1)
<-c.block select {
case <-c.block:
case <-ctx.Done():
}
return nil return nil
} }
@ -259,7 +265,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
cfg.Capacity = n cfg.Capacity = n
m := NewQueueManager(nil, cfg, nil, nil, c) m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
@ -299,3 +305,26 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
t.Errorf("Saw %d concurrent sends, expected 1", numCalls) t.Errorf("Saw %d concurrent sends, expected 1", numCalls)
} }
} }
func TestShutdown(t *testing.T) {
deadline := 10 * time.Second
c := NewTestBlockedStorageClient()
m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, deadline)
for i := 0; i < config.DefaultQueueConfig.MaxSamplesPerSend; i++ {
m.Append(&model.Sample{
Metric: model.Metric{
model.MetricNameLabel: model.LabelValue(fmt.Sprintf("test_metric_%d", i)),
},
Value: model.SampleValue(i),
Timestamp: model.Time(i),
})
}
m.Start()
start := time.Now()
m.Stop()
duration := time.Now().Sub(start)
if duration > deadline+(deadline/10) {
t.Errorf("Took too long to shutdown: %s > %s", duration, deadline)
}
}

View file

@ -16,6 +16,7 @@ package remote
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -39,14 +40,19 @@ type Storage struct {
// For reads // For reads
queryables []storage.Queryable queryables []storage.Queryable
localStartTimeCallback startTimeCallback localStartTimeCallback startTimeCallback
flushDeadline time.Duration
} }
// NewStorage returns a remote.Storage. // NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, stCallback startTimeCallback) *Storage { func NewStorage(l log.Logger, stCallback startTimeCallback, flushDeadline time.Duration) *Storage {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
return &Storage{logger: l, localStartTimeCallback: stCallback} return &Storage{
logger: l,
localStartTimeCallback: stCallback,
flushDeadline: flushDeadline,
}
} }
// ApplyConfig updates the state as the new config requires. // ApplyConfig updates the state as the new config requires.
@ -74,6 +80,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
conf.GlobalConfig.ExternalLabels, conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs, rwConf.WriteRelabelConfigs,
c, c,
s.flushDeadline,
)) ))
} }