From ece12bff939828feb3209de1cad67dd623e0ae8b Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 29 Aug 2016 17:08:19 +0200 Subject: [PATCH] Shard/parrallelise samples by fingerprint in StorageQueueManager By splitting the single queue into multiple queues and flushing each individual queue in serially (and all queues in parallel), we can guarantee to preserve the order of timestampsin samples sent to downstream systems. --- storage/remote/queue_manager.go | 192 ++++++++++++++++----------- storage/remote/queue_manager_test.go | 121 +++++++++++++---- storage/remote/remote.go | 6 +- 3 files changed, 207 insertions(+), 112 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index d89e9406db..6f6dafba8d 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -14,6 +14,7 @@ package remote import ( + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -21,16 +22,6 @@ import ( "github.com/prometheus/common/model" ) -const ( - // The maximum number of concurrent send requests to the remote storage. - maxConcurrentSends = 10 - // The maximum number of samples to fit into a single request to the remote storage. - maxSamplesPerSend = 100 - // The deadline after which to send queued samples even if the maximum batch - // size has not been reached. - batchSendDeadline = 5 * time.Second -) - // String constants for instrumentation. const ( namespace = "prometheus" @@ -51,14 +42,28 @@ type StorageClient interface { Name() string } +type StorageQueueManagerConfig struct { + QueueCapacity int // Number of samples to buffer per shard before we start dropping them. + Shards int // Number of shards, i.e. amount of concurrency. + MaxSamplesPerSend int // Maximum number of samples per send. + BatchSendDeadline time.Duration // Maximum time sample will wait in buffer. +} + +var defaultConfig = StorageQueueManagerConfig{ + QueueCapacity: 100 * 1024 / 10, + Shards: 10, + MaxSamplesPerSend: 100, + BatchSendDeadline: 5 * time.Second, +} + // StorageQueueManager manages a queue of samples to be sent to the Storage // indicated by the provided StorageClient. type StorageQueueManager struct { - tsdb StorageClient - queue chan *model.Sample - pendingSamples model.Samples - sendSemaphore chan bool - drained chan bool + cfg StorageQueueManagerConfig + tsdb StorageClient + shards []chan *model.Sample + wg sync.WaitGroup + done chan struct{} samplesCount *prometheus.CounterVec sendLatency prometheus.Summary @@ -69,16 +74,25 @@ type StorageQueueManager struct { } // NewStorageQueueManager builds a new StorageQueueManager. -func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager { +func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) *StorageQueueManager { constLabels := prometheus.Labels{ "type": tsdb.Name(), } - return &StorageQueueManager{ - tsdb: tsdb, - queue: make(chan *model.Sample, queueCapacity), - sendSemaphore: make(chan bool, maxConcurrentSends), - drained: make(chan bool), + if cfg == nil { + cfg = &defaultConfig + } + + shards := make([]chan *model.Sample, cfg.Shards) + for i := 0; i < cfg.Shards; i++ { + shards[i] = make(chan *model.Sample, cfg.QueueCapacity) + } + + t := &StorageQueueManager{ + cfg: *cfg, + tsdb: tsdb, + shards: shards, + done: make(chan struct{}), samplesCount: prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -126,17 +140,23 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue constLabels, ), prometheus.GaugeValue, - float64(queueCapacity), + float64(cfg.QueueCapacity), ), } + + t.wg.Add(cfg.Shards) + return t } // Append queues a sample to be sent to the remote storage. It drops the // sample on the floor if the queue is full. // Always returns nil. func (t *StorageQueueManager) Append(s *model.Sample) error { + fp := s.Metric.FastFingerprint() + shard := uint64(fp) % uint64(t.cfg.Shards) + select { - case t.queue <- s: + case t.shards[shard] <- s: default: t.samplesCount.WithLabelValues(dropped).Inc() log.Warn("Remote storage queue full, discarding sample.") @@ -144,16 +164,11 @@ func (t *StorageQueueManager) Append(s *model.Sample) error { return nil } -// Stop stops sending samples to the remote storage and waits for pending -// sends to complete. -func (t *StorageQueueManager) Stop() { - log.Infof("Stopping remote storage...") - close(t.queue) - <-t.drained - for i := 0; i < maxConcurrentSends; i++ { - t.sendSemaphore <- true - } - log.Info("Remote storage stopped.") +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (*StorageQueueManager) NeedsThrottling() bool { + return false } // Describe implements prometheus.Collector. @@ -166,79 +181,96 @@ func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) { ch <- t.queueCapacity.Desc() } +// QueueLength returns the number of outstanding samples in the queue. +func (t *StorageQueueManager) queueLen() int { + queueLength := 0 + for _, shard := range t.shards { + queueLength += len(shard) + } + return queueLength +} + // Collect implements prometheus.Collector. func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) { t.samplesCount.Collect(ch) t.sendLatency.Collect(ch) - t.queueLength.Set(float64(len(t.queue))) + t.queueLength.Set(float64(t.queueLen())) ch <- t.failedBatches ch <- t.failedSamples ch <- t.queueLength ch <- t.queueCapacity } -func (t *StorageQueueManager) sendSamples(s model.Samples) { - t.sendSemaphore <- true - - go func() { - defer func() { - <-t.sendSemaphore - }() - - // Samples are sent to the remote storage on a best-effort basis. If a - // sample isn't sent correctly the first time, it's simply dropped on the - // floor. - begin := time.Now() - err := t.tsdb.Store(s) - duration := time.Since(begin).Seconds() - - labelValue := success - if err != nil { - log.Warnf("error sending %d samples to remote storage: %s", len(s), err) - labelValue = failure - t.failedBatches.Inc() - t.failedSamples.Add(float64(len(s))) - } - t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s))) - t.sendLatency.Observe(duration) - }() -} - // Run continuously sends samples to the remote storage. func (t *StorageQueueManager) Run() { - defer func() { - close(t.drained) - }() + for i := 0; i < t.cfg.Shards; i++ { + go t.runShard(i) + } + t.wg.Wait() +} - // Send batches of at most maxSamplesPerSend samples to the remote storage. +// Stop stops sending samples to the remote storage and waits for pending +// sends to complete. +func (t *StorageQueueManager) Stop() { + log.Infof("Stopping remote storage...") + for _, shard := range t.shards { + close(shard) + } + t.wg.Wait() + log.Info("Remote storage stopped.") +} + +func (t *StorageQueueManager) runShard(i int) { + defer t.wg.Done() + shard := t.shards[i] + + // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline // anyways. + pendingSamples := model.Samples{} + for { select { - case s, ok := <-t.queue: + case s, ok := <-shard: if !ok { - log.Infof("Flushing %d samples to remote storage...", len(t.pendingSamples)) - t.flush() - log.Infof("Done flushing.") + if len(pendingSamples) > 0 { + log.Infof("Flushing %d samples to remote storage...", len(pendingSamples)) + t.sendSamples(pendingSamples) + log.Infof("Done flushing.") + } return } - t.pendingSamples = append(t.pendingSamples, s) + pendingSamples = append(pendingSamples, s) - for len(t.pendingSamples) >= maxSamplesPerSend { - t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) - t.pendingSamples = t.pendingSamples[maxSamplesPerSend:] + for len(pendingSamples) >= t.cfg.MaxSamplesPerSend { + t.sendSamples(pendingSamples[:t.cfg.MaxSamplesPerSend]) + pendingSamples = pendingSamples[t.cfg.MaxSamplesPerSend:] + } + case <-time.After(t.cfg.BatchSendDeadline): + if len(pendingSamples) > 0 { + t.sendSamples(pendingSamples) + pendingSamples = pendingSamples[:0] } - case <-time.After(batchSendDeadline): - t.flush() } } } -// Flush flushes remaining queued samples. -func (t *StorageQueueManager) flush() { - if len(t.pendingSamples) > 0 { - t.sendSamples(t.pendingSamples) +func (t *StorageQueueManager) sendSamples(s model.Samples) { + // Samples are sent to the remote storage on a best-effort basis. If a + // sample isn't sent correctly the first time, it's simply dropped on the + // floor. + begin := time.Now() + err := t.tsdb.Store(s) + duration := time.Since(begin).Seconds() + + labelValue := success + if err != nil { + log.Warnf("error sending %d samples to remote storage: %s", len(s), err) + labelValue = failure + t.failedBatches.Inc() + t.failedSamples.Add(float64(len(s))) } - t.pendingSamples = t.pendingSamples[:0] + t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s))) + t.sendLatency.Observe(duration) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index afe23178f2..f3bd85aa31 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -14,6 +14,7 @@ package remote import ( + "fmt" "sync" "sync/atomic" "testing" @@ -23,28 +24,53 @@ import ( ) type TestStorageClient struct { - receivedSamples model.Samples - expectedSamples model.Samples + receivedSamples map[string]model.Samples + expectedSamples map[string]model.Samples wg sync.WaitGroup + mtx sync.Mutex } -func (c *TestStorageClient) expectSamples(s model.Samples) { - c.expectedSamples = append(c.expectedSamples, s...) - c.wg.Add(len(s)) +func NewTestStorageClient() *TestStorageClient { + return &TestStorageClient{ + receivedSamples: map[string]model.Samples{}, + expectedSamples: map[string]model.Samples{}, + } +} + +func (c *TestStorageClient) expectSamples(ss model.Samples) { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.expectedSamples[ts] = append(c.expectedSamples[ts], s) + } + c.wg.Add(len(ss)) } func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { c.wg.Wait() - for i, expected := range c.expectedSamples { - if !expected.Equal(c.receivedSamples[i]) { - t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[i]) + + c.mtx.Lock() + defer c.mtx.Unlock() + for ts, expectedSamples := range c.expectedSamples { + for i, expected := range expectedSamples { + if !expected.Equal(c.receivedSamples[ts][i]) { + t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i]) + } } } } -func (c *TestStorageClient) Store(s model.Samples) error { - c.receivedSamples = append(c.receivedSamples, s...) - c.wg.Add(-len(s)) +func (c *TestStorageClient) Store(ss model.Samples) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.receivedSamples[ts] = append(c.receivedSamples[ts], s) + } + c.wg.Add(-len(ss)) return nil } @@ -55,21 +81,24 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := maxSamplesPerSend * 2 + cfg := defaultConfig + n := cfg.QueueCapacity * 2 + cfg.Shards = 1 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) samples = append(samples, &model.Sample{ Metric: model.Metric{ - model.MetricNameLabel: "test_metric", + model.MetricNameLabel: name, }, Value: model.SampleValue(i), }) } - c := &TestStorageClient{} + c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - m := NewStorageQueueManager(c, len(samples)/2) + m := NewStorageQueueManager(c, &cfg) // These should be received by the client. for _, s := range samples[:len(samples)/2] { @@ -85,6 +114,39 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +func TestSampleDeliveryOrder(t *testing.T) { + cfg := defaultConfig + ts := 10 + n := cfg.MaxSamplesPerSend * ts + // Ensure we don't drop samples in this test. + cfg.QueueCapacity = n + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples) + m := NewStorageQueueManager(c, &cfg) + + // These should be received by the client. + for _, s := range samples { + m.Append(s) + } + go m.Run() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + // TestBlockingStorageClient is a queue_manager StorageClient which will block // on any calls to Store(), until the `block` channel is closed, at which point // the `numCalls` property will contain a count of how many times Store() was @@ -121,24 +183,26 @@ func (c *TestBlockingStorageClient) Name() string { func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // Our goal is to fully empty the queue: - // `maxSamplesPerSend*maxConcurrentSends` samples should be consumed by the - // semaphore-controlled goroutines, and then another `maxSamplesPerSend` - // should be consumed by the Run() loop calling sendSample and immediately - // blocking. - n := maxSamplesPerSend*maxConcurrentSends + maxSamplesPerSend + // `MaxSamplesPerSend*Shards` samples should be consumed by the + // per-shard goroutines, and then another `MaxSamplesPerSend` + // should be left on the queue. + cfg := defaultConfig + n := cfg.MaxSamplesPerSend*cfg.Shards + cfg.MaxSamplesPerSend + cfg.QueueCapacity = n samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) samples = append(samples, &model.Sample{ Metric: model.Metric{ - model.MetricNameLabel: "test_metric", + model.MetricNameLabel: name, }, Value: model.SampleValue(i), }) } c := NewTestBlockedStorageClient() - m := NewStorageQueueManager(c, n) + m := NewStorageQueueManager(c, &cfg) go m.Run() @@ -151,7 +215,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { m.Append(s) } - // Wait until the Run() loop drains the queue. If things went right, it + // Wait until the runShard() loops drain the queue. If things went right, it // should then immediately block in sendSamples(), but, in case of error, // it would spawn too many goroutines, and thus we'd see more calls to // client.Store() @@ -163,19 +227,18 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // draining the queue. We cap the waiting at 1 second -- that should give // plenty of time, and keeps the failure fairly quick if we're not draining // the queue properly. - for i := 0; i < 100 && len(m.queue) > 0; i++ { + for i := 0; i < 100 && m.queueLen() > 0; i++ { time.Sleep(10 * time.Millisecond) } - if len(m.queue) > 0 { + if m.queueLen() != cfg.MaxSamplesPerSend { t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left", - len(m.queue), + m.queueLen(), ) } numCalls := c.NumCalls() - if numCalls != maxConcurrentSends { - t.Errorf("Saw %d concurrent sends, expected %d", numCalls, maxConcurrentSends) + if numCalls != uint64(cfg.Shards) { + t.Errorf("Saw %d concurrent sends, expected %d", numCalls, cfg.Shards) } - } diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 2c1923b79b..e8786829d8 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -52,11 +52,11 @@ func New(o *Options) (*Storage, error) { c := graphite.NewClient( o.GraphiteAddress, o.GraphiteTransport, o.StorageTimeout, o.GraphitePrefix) - s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } if o.OpentsdbURL != "" { c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout) - s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } if o.InfluxdbURL != nil { conf := influx.Config{ @@ -67,7 +67,7 @@ func New(o *Options) (*Storage, error) { } c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy) prometheus.MustRegister(c) - s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } if o.Address != "" { c, err := NewClient(o.Address, o.StorageTimeout)