mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Remote writes: retry on recoverable errors. (#2552)
* Remote writes: retry on recoverable errors. * Add comments * Review feedback * Comments * Review feedback * Final spelling misteak (I hope). Plus, record failed samples correctly.
This commit is contained in:
parent
9775ad4754
commit
e5d7bbfc3c
|
@ -60,6 +60,10 @@ func NewClient(index int, conf *clientConfig) (*Client, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type recoverableError struct {
|
||||||
|
error
|
||||||
|
}
|
||||||
|
|
||||||
// Store sends a batch of samples to the HTTP endpoint.
|
// Store sends a batch of samples to the HTTP endpoint.
|
||||||
func (c *Client) Store(samples model.Samples) error {
|
func (c *Client) Store(samples model.Samples) error {
|
||||||
req := &WriteRequest{
|
req := &WriteRequest{
|
||||||
|
@ -97,6 +101,8 @@ func (c *Client) Store(samples model.Samples) error {
|
||||||
|
|
||||||
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
|
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// Errors from NewRequest are from unparseable URLs, so are not
|
||||||
|
// recoverable.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
httpReq.Header.Add("Content-Encoding", "snappy")
|
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||||
|
@ -108,11 +114,17 @@ func (c *Client) Store(samples model.Samples) error {
|
||||||
|
|
||||||
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
|
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
// Errors from client.Do are from (for example) network errors, so are
|
||||||
|
// recoverable.
|
||||||
|
return recoverableError{err}
|
||||||
}
|
}
|
||||||
defer httpResp.Body.Close()
|
defer httpResp.Body.Close()
|
||||||
|
|
||||||
if httpResp.StatusCode/100 != 2 {
|
if httpResp.StatusCode/100 != 2 {
|
||||||
return fmt.Errorf("server returned HTTP status %s", httpResp.Status)
|
err = fmt.Errorf("server returned HTTP status %s", httpResp.Status)
|
||||||
|
}
|
||||||
|
if httpResp.StatusCode/100 == 5 {
|
||||||
|
return recoverableError{err}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,17 +33,6 @@ const (
|
||||||
subsystem = "remote_storage"
|
subsystem = "remote_storage"
|
||||||
queue = "queue"
|
queue = "queue"
|
||||||
|
|
||||||
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
|
||||||
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
|
||||||
defaultMaxShards = 1000
|
|
||||||
defaultMaxSamplesPerSend = 100
|
|
||||||
|
|
||||||
// defaultQueueCapacity is per shard - at 1000 shards, this will buffer
|
|
||||||
// 100M samples. It is configured to buffer 1000 batches, which at 100ms
|
|
||||||
// per batch is 1:40mins.
|
|
||||||
defaultQueueCapacity = defaultMaxSamplesPerSend * 1000
|
|
||||||
defaultBatchSendDeadline = 5 * time.Second
|
|
||||||
|
|
||||||
// We track samples in/out and how long pushes take using an Exponentially
|
// We track samples in/out and how long pushes take using an Exponentially
|
||||||
// Weighted Moving Average.
|
// Weighted Moving Average.
|
||||||
ewmaWeight = 0.2
|
ewmaWeight = 0.2
|
||||||
|
@ -58,12 +47,12 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sentSamplesTotal = prometheus.NewCounterVec(
|
succeededSamplesTotal = prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "sent_samples_total",
|
Name: "succeeded_samples_total",
|
||||||
Help: "Total number of processed samples sent to remote storage.",
|
Help: "Total number of samples successfully sent to remote storage.",
|
||||||
},
|
},
|
||||||
[]string{queue},
|
[]string{queue},
|
||||||
)
|
)
|
||||||
|
@ -72,7 +61,7 @@ var (
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "failed_samples_total",
|
Name: "failed_samples_total",
|
||||||
Help: "Total number of processed samples which failed on send to remote storage.",
|
Help: "Total number of samples which failed on send to remote storage.",
|
||||||
},
|
},
|
||||||
[]string{queue},
|
[]string{queue},
|
||||||
)
|
)
|
||||||
|
@ -125,7 +114,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(sentSamplesTotal)
|
prometheus.MustRegister(succeededSamplesTotal)
|
||||||
prometheus.MustRegister(failedSamplesTotal)
|
prometheus.MustRegister(failedSamplesTotal)
|
||||||
prometheus.MustRegister(droppedSamplesTotal)
|
prometheus.MustRegister(droppedSamplesTotal)
|
||||||
prometheus.MustRegister(sentBatchDuration)
|
prometheus.MustRegister(sentBatchDuration)
|
||||||
|
@ -134,6 +123,42 @@ func init() {
|
||||||
prometheus.MustRegister(numShards)
|
prometheus.MustRegister(numShards)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueManagerConfig is the configuration for the queue used to write to remote
|
||||||
|
// storage.
|
||||||
|
type QueueManagerConfig struct {
|
||||||
|
// Number of samples to buffer per shard before we start dropping them.
|
||||||
|
QueueCapacity int
|
||||||
|
// Max number of shards, i.e. amount of concurrency.
|
||||||
|
MaxShards int
|
||||||
|
// Maximum number of samples per send.
|
||||||
|
MaxSamplesPerSend int
|
||||||
|
// Maximum time sample will wait in buffer.
|
||||||
|
BatchSendDeadline time.Duration
|
||||||
|
// Max number of times to retry a batch on recoverable errors.
|
||||||
|
MaxRetries int
|
||||||
|
// On recoverable errors, backoff exponentially.
|
||||||
|
MinBackoff time.Duration
|
||||||
|
MaxBackoff time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultQueueManagerConfig is the default remote queue configuration.
|
||||||
|
var defaultQueueManagerConfig = QueueManagerConfig{
|
||||||
|
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
||||||
|
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
||||||
|
MaxShards: 1000,
|
||||||
|
MaxSamplesPerSend: 100,
|
||||||
|
|
||||||
|
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
|
||||||
|
// 1000 shards, this will buffer 100M samples total.
|
||||||
|
QueueCapacity: 100 * 1000,
|
||||||
|
BatchSendDeadline: 5 * time.Second,
|
||||||
|
|
||||||
|
// Max number of times to retry a batch on recoverable errors.
|
||||||
|
MaxRetries: 10,
|
||||||
|
MinBackoff: 30 * time.Millisecond,
|
||||||
|
MaxBackoff: 100 * time.Millisecond,
|
||||||
|
}
|
||||||
|
|
||||||
// StorageClient defines an interface for sending a batch of samples to an
|
// StorageClient defines an interface for sending a batch of samples to an
|
||||||
// external timeseries database.
|
// external timeseries database.
|
||||||
type StorageClient interface {
|
type StorageClient interface {
|
||||||
|
@ -143,23 +168,15 @@ type StorageClient interface {
|
||||||
Name() string
|
Name() string
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueManagerConfig configures a storage queue.
|
|
||||||
type QueueManagerConfig struct {
|
|
||||||
QueueCapacity int // Number of samples to buffer per shard before we start dropping them.
|
|
||||||
MaxShards int // Max number of shards, i.e. amount of concurrency.
|
|
||||||
MaxSamplesPerSend int // Maximum number of samples per send.
|
|
||||||
BatchSendDeadline time.Duration // Maximum time sample will wait in buffer.
|
|
||||||
ExternalLabels model.LabelSet
|
|
||||||
RelabelConfigs []*config.RelabelConfig
|
|
||||||
Client StorageClient
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueueManager manages a queue of samples to be sent to the Storage
|
// QueueManager manages a queue of samples to be sent to the Storage
|
||||||
// indicated by the provided StorageClient.
|
// indicated by the provided StorageClient.
|
||||||
type QueueManager struct {
|
type QueueManager struct {
|
||||||
cfg QueueManagerConfig
|
cfg QueueManagerConfig
|
||||||
queueName string
|
externalLabels model.LabelSet
|
||||||
logLimiter *rate.Limiter
|
relabelConfigs []*config.RelabelConfig
|
||||||
|
client StorageClient
|
||||||
|
queueName string
|
||||||
|
logLimiter *rate.Limiter
|
||||||
|
|
||||||
shardsMtx sync.Mutex
|
shardsMtx sync.Mutex
|
||||||
shards *shards
|
shards *shards
|
||||||
|
@ -173,23 +190,14 @@ type QueueManager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQueueManager builds a new QueueManager.
|
// NewQueueManager builds a new QueueManager.
|
||||||
func NewQueueManager(cfg QueueManagerConfig) *QueueManager {
|
func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
|
||||||
if cfg.QueueCapacity == 0 {
|
|
||||||
cfg.QueueCapacity = defaultQueueCapacity
|
|
||||||
}
|
|
||||||
if cfg.MaxShards == 0 {
|
|
||||||
cfg.MaxShards = defaultMaxShards
|
|
||||||
}
|
|
||||||
if cfg.MaxSamplesPerSend == 0 {
|
|
||||||
cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend
|
|
||||||
}
|
|
||||||
if cfg.BatchSendDeadline == 0 {
|
|
||||||
cfg.BatchSendDeadline = defaultBatchSendDeadline
|
|
||||||
}
|
|
||||||
|
|
||||||
t := &QueueManager{
|
t := &QueueManager{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
queueName: cfg.Client.Name(),
|
externalLabels: externalLabels,
|
||||||
|
relabelConfigs: relabelConfigs,
|
||||||
|
client: client,
|
||||||
|
queueName: client.Name(),
|
||||||
|
|
||||||
logLimiter: rate.NewLimiter(logRateLimit, logBurst),
|
logLimiter: rate.NewLimiter(logRateLimit, logBurst),
|
||||||
numShards: 1,
|
numShards: 1,
|
||||||
reshardChan: make(chan int),
|
reshardChan: make(chan int),
|
||||||
|
@ -214,14 +222,14 @@ func (t *QueueManager) Append(s *model.Sample) error {
|
||||||
snew = *s
|
snew = *s
|
||||||
snew.Metric = s.Metric.Clone()
|
snew.Metric = s.Metric.Clone()
|
||||||
|
|
||||||
for ln, lv := range t.cfg.ExternalLabels {
|
for ln, lv := range t.externalLabels {
|
||||||
if _, ok := s.Metric[ln]; !ok {
|
if _, ok := s.Metric[ln]; !ok {
|
||||||
snew.Metric[ln] = lv
|
snew.Metric[ln] = lv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
snew.Metric = model.Metric(
|
snew.Metric = model.Metric(
|
||||||
relabel.Process(model.LabelSet(snew.Metric), t.cfg.RelabelConfigs...))
|
relabel.Process(model.LabelSet(snew.Metric), t.relabelConfigs...))
|
||||||
|
|
||||||
if snew.Metric == nil {
|
if snew.Metric == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -467,21 +475,38 @@ func (s *shards) runShard(i int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *shards) sendSamples(samples model.Samples) {
|
func (s *shards) sendSamples(samples model.Samples) {
|
||||||
// Samples are sent to the remote storage on a best-effort basis. If a
|
|
||||||
// sample isn't sent correctly the first time, it's simply dropped on the
|
|
||||||
// floor.
|
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
err := s.qm.cfg.Client.Store(samples)
|
s.sendSamplesWithBackoff(samples)
|
||||||
duration := time.Since(begin)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("error sending %d samples to remote storage: %s", len(samples), err)
|
|
||||||
failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
|
|
||||||
} else {
|
|
||||||
sentSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
|
|
||||||
}
|
|
||||||
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(duration.Seconds())
|
|
||||||
|
|
||||||
|
// These counters are used to caclulate the dynamic sharding, and as such
|
||||||
|
// should be maintained irrespective of success or failure.
|
||||||
s.qm.samplesOut.incr(int64(len(samples)))
|
s.qm.samplesOut.incr(int64(len(samples)))
|
||||||
s.qm.samplesOutDuration.incr(int64(duration))
|
s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||||
|
func (s *shards) sendSamplesWithBackoff(samples model.Samples) {
|
||||||
|
backoff := s.qm.cfg.MinBackoff
|
||||||
|
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- {
|
||||||
|
begin := time.Now()
|
||||||
|
err := s.qm.client.Store(samples)
|
||||||
|
|
||||||
|
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())
|
||||||
|
if err == nil {
|
||||||
|
succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warnf("Error sending %d samples to remote storage: %s", len(samples), err)
|
||||||
|
if _, ok := err.(recoverableError); !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(backoff)
|
||||||
|
backoff = backoff * 2
|
||||||
|
if backoff > s.qm.cfg.MaxBackoff {
|
||||||
|
backoff = s.qm.cfg.MaxBackoff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ func (c *TestStorageClient) Name() string {
|
||||||
func TestSampleDelivery(t *testing.T) {
|
func TestSampleDelivery(t *testing.T) {
|
||||||
// Let's create an even number of send batches so we don't run into the
|
// Let's create an even number of send batches so we don't run into the
|
||||||
// batch timeout case.
|
// batch timeout case.
|
||||||
n := defaultQueueCapacity * 2
|
n := defaultQueueManagerConfig.QueueCapacity * 2
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -97,10 +97,9 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
c := NewTestStorageClient()
|
c := NewTestStorageClient()
|
||||||
c.expectSamples(samples[:len(samples)/2])
|
c.expectSamples(samples[:len(samples)/2])
|
||||||
|
|
||||||
m := NewQueueManager(QueueManagerConfig{
|
cfg := defaultQueueManagerConfig
|
||||||
Client: c,
|
cfg.MaxShards = 1
|
||||||
MaxShards: 1,
|
m := NewQueueManager(cfg, nil, nil, c)
|
||||||
})
|
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
for _, s := range samples[:len(samples)/2] {
|
for _, s := range samples[:len(samples)/2] {
|
||||||
|
@ -118,7 +117,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
|
|
||||||
func TestSampleDeliveryOrder(t *testing.T) {
|
func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
ts := 10
|
ts := 10
|
||||||
n := defaultMaxSamplesPerSend * ts
|
n := defaultQueueManagerConfig.MaxSamplesPerSend * ts
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -134,11 +133,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
||||||
|
|
||||||
c := NewTestStorageClient()
|
c := NewTestStorageClient()
|
||||||
c.expectSamples(samples)
|
c.expectSamples(samples)
|
||||||
m := NewQueueManager(QueueManagerConfig{
|
m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c)
|
||||||
Client: c,
|
|
||||||
// Ensure we don't drop samples in this test.
|
|
||||||
QueueCapacity: n,
|
|
||||||
})
|
|
||||||
|
|
||||||
// These should be received by the client.
|
// These should be received by the client.
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
|
@ -199,7 +194,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
||||||
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
||||||
// should be left on the queue.
|
// should be left on the queue.
|
||||||
n := defaultMaxSamplesPerSend*1 + defaultMaxSamplesPerSend
|
n := defaultQueueManagerConfig.MaxSamplesPerSend * 2
|
||||||
|
|
||||||
samples := make(model.Samples, 0, n)
|
samples := make(model.Samples, 0, n)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -213,11 +208,10 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
c := NewTestBlockedStorageClient()
|
c := NewTestBlockedStorageClient()
|
||||||
m := NewQueueManager(QueueManagerConfig{
|
cfg := defaultQueueManagerConfig
|
||||||
Client: c,
|
cfg.MaxShards = 1
|
||||||
QueueCapacity: n,
|
cfg.QueueCapacity = n
|
||||||
MaxShards: 1,
|
m := NewQueueManager(cfg, nil, nil, c)
|
||||||
})
|
|
||||||
|
|
||||||
m.Start()
|
m.Start()
|
||||||
|
|
||||||
|
@ -246,7 +240,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.queueLen() != defaultMaxSamplesPerSend {
|
if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend {
|
||||||
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
|
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
|
||||||
m.queueLen(),
|
m.queueLen(),
|
||||||
)
|
)
|
||||||
|
|
|
@ -44,11 +44,12 @@ func (w *Writer) ApplyConfig(conf *config.Config) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
newQueues = append(newQueues, NewQueueManager(QueueManagerConfig{
|
newQueues = append(newQueues, NewQueueManager(
|
||||||
Client: c,
|
defaultQueueManagerConfig,
|
||||||
ExternalLabels: conf.GlobalConfig.ExternalLabels,
|
conf.GlobalConfig.ExternalLabels,
|
||||||
RelabelConfigs: rwConf.WriteRelabelConfigs,
|
rwConf.WriteRelabelConfigs,
|
||||||
}))
|
c,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, q := range w.queues {
|
for _, q := range w.queues {
|
||||||
|
|
Loading…
Reference in a new issue