mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Shard/parrallelise samples by fingerprint in StorageQueueManager
By splitting the single queue into multiple queues and flushing each individual queue in serially (and all queues in parallel), we can guarantee to preserve the order of timestampsin samples sent to downstream systems.
This commit is contained in:
parent
fe29e87824
commit
ece12bff93
|
@ -14,6 +14,7 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -21,16 +22,6 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
const (
|
||||
// The maximum number of concurrent send requests to the remote storage.
|
||||
maxConcurrentSends = 10
|
||||
// The maximum number of samples to fit into a single request to the remote storage.
|
||||
maxSamplesPerSend = 100
|
||||
// The deadline after which to send queued samples even if the maximum batch
|
||||
// size has not been reached.
|
||||
batchSendDeadline = 5 * time.Second
|
||||
)
|
||||
|
||||
// String constants for instrumentation.
|
||||
const (
|
||||
namespace = "prometheus"
|
||||
|
@ -51,14 +42,28 @@ type StorageClient interface {
|
|||
Name() string
|
||||
}
|
||||
|
||||
type StorageQueueManagerConfig struct {
|
||||
QueueCapacity int // Number of samples to buffer per shard before we start dropping them.
|
||||
Shards int // Number of shards, i.e. amount of concurrency.
|
||||
MaxSamplesPerSend int // Maximum number of samples per send.
|
||||
BatchSendDeadline time.Duration // Maximum time sample will wait in buffer.
|
||||
}
|
||||
|
||||
var defaultConfig = StorageQueueManagerConfig{
|
||||
QueueCapacity: 100 * 1024 / 10,
|
||||
Shards: 10,
|
||||
MaxSamplesPerSend: 100,
|
||||
BatchSendDeadline: 5 * time.Second,
|
||||
}
|
||||
|
||||
// StorageQueueManager manages a queue of samples to be sent to the Storage
|
||||
// indicated by the provided StorageClient.
|
||||
type StorageQueueManager struct {
|
||||
cfg StorageQueueManagerConfig
|
||||
tsdb StorageClient
|
||||
queue chan *model.Sample
|
||||
pendingSamples model.Samples
|
||||
sendSemaphore chan bool
|
||||
drained chan bool
|
||||
shards []chan *model.Sample
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
||||
samplesCount *prometheus.CounterVec
|
||||
sendLatency prometheus.Summary
|
||||
|
@ -69,16 +74,25 @@ type StorageQueueManager struct {
|
|||
}
|
||||
|
||||
// NewStorageQueueManager builds a new StorageQueueManager.
|
||||
func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager {
|
||||
func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) *StorageQueueManager {
|
||||
constLabels := prometheus.Labels{
|
||||
"type": tsdb.Name(),
|
||||
}
|
||||
|
||||
return &StorageQueueManager{
|
||||
if cfg == nil {
|
||||
cfg = &defaultConfig
|
||||
}
|
||||
|
||||
shards := make([]chan *model.Sample, cfg.Shards)
|
||||
for i := 0; i < cfg.Shards; i++ {
|
||||
shards[i] = make(chan *model.Sample, cfg.QueueCapacity)
|
||||
}
|
||||
|
||||
t := &StorageQueueManager{
|
||||
cfg: *cfg,
|
||||
tsdb: tsdb,
|
||||
queue: make(chan *model.Sample, queueCapacity),
|
||||
sendSemaphore: make(chan bool, maxConcurrentSends),
|
||||
drained: make(chan bool),
|
||||
shards: shards,
|
||||
done: make(chan struct{}),
|
||||
|
||||
samplesCount: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
|
@ -126,17 +140,23 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue
|
|||
constLabels,
|
||||
),
|
||||
prometheus.GaugeValue,
|
||||
float64(queueCapacity),
|
||||
float64(cfg.QueueCapacity),
|
||||
),
|
||||
}
|
||||
|
||||
t.wg.Add(cfg.Shards)
|
||||
return t
|
||||
}
|
||||
|
||||
// Append queues a sample to be sent to the remote storage. It drops the
|
||||
// sample on the floor if the queue is full.
|
||||
// Always returns nil.
|
||||
func (t *StorageQueueManager) Append(s *model.Sample) error {
|
||||
fp := s.Metric.FastFingerprint()
|
||||
shard := uint64(fp) % uint64(t.cfg.Shards)
|
||||
|
||||
select {
|
||||
case t.queue <- s:
|
||||
case t.shards[shard] <- s:
|
||||
default:
|
||||
t.samplesCount.WithLabelValues(dropped).Inc()
|
||||
log.Warn("Remote storage queue full, discarding sample.")
|
||||
|
@ -144,16 +164,11 @@ func (t *StorageQueueManager) Append(s *model.Sample) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Stop stops sending samples to the remote storage and waits for pending
|
||||
// sends to complete.
|
||||
func (t *StorageQueueManager) Stop() {
|
||||
log.Infof("Stopping remote storage...")
|
||||
close(t.queue)
|
||||
<-t.drained
|
||||
for i := 0; i < maxConcurrentSends; i++ {
|
||||
t.sendSemaphore <- true
|
||||
}
|
||||
log.Info("Remote storage stopped.")
|
||||
// NeedsThrottling implements storage.SampleAppender. It will always return
|
||||
// false as a remote storage drops samples on the floor if backlogging instead
|
||||
// of asking for throttling.
|
||||
func (*StorageQueueManager) NeedsThrottling() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
|
@ -166,25 +181,82 @@ func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) {
|
|||
ch <- t.queueCapacity.Desc()
|
||||
}
|
||||
|
||||
// QueueLength returns the number of outstanding samples in the queue.
|
||||
func (t *StorageQueueManager) queueLen() int {
|
||||
queueLength := 0
|
||||
for _, shard := range t.shards {
|
||||
queueLength += len(shard)
|
||||
}
|
||||
return queueLength
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
|
||||
t.samplesCount.Collect(ch)
|
||||
t.sendLatency.Collect(ch)
|
||||
t.queueLength.Set(float64(len(t.queue)))
|
||||
t.queueLength.Set(float64(t.queueLen()))
|
||||
ch <- t.failedBatches
|
||||
ch <- t.failedSamples
|
||||
ch <- t.queueLength
|
||||
ch <- t.queueCapacity
|
||||
}
|
||||
|
||||
// Run continuously sends samples to the remote storage.
|
||||
func (t *StorageQueueManager) Run() {
|
||||
for i := 0; i < t.cfg.Shards; i++ {
|
||||
go t.runShard(i)
|
||||
}
|
||||
t.wg.Wait()
|
||||
}
|
||||
|
||||
// Stop stops sending samples to the remote storage and waits for pending
|
||||
// sends to complete.
|
||||
func (t *StorageQueueManager) Stop() {
|
||||
log.Infof("Stopping remote storage...")
|
||||
for _, shard := range t.shards {
|
||||
close(shard)
|
||||
}
|
||||
t.wg.Wait()
|
||||
log.Info("Remote storage stopped.")
|
||||
}
|
||||
|
||||
func (t *StorageQueueManager) runShard(i int) {
|
||||
defer t.wg.Done()
|
||||
shard := t.shards[i]
|
||||
|
||||
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
|
||||
// If we have fewer samples than that, flush them out after a deadline
|
||||
// anyways.
|
||||
pendingSamples := model.Samples{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case s, ok := <-shard:
|
||||
if !ok {
|
||||
if len(pendingSamples) > 0 {
|
||||
log.Infof("Flushing %d samples to remote storage...", len(pendingSamples))
|
||||
t.sendSamples(pendingSamples)
|
||||
log.Infof("Done flushing.")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
pendingSamples = append(pendingSamples, s)
|
||||
|
||||
for len(pendingSamples) >= t.cfg.MaxSamplesPerSend {
|
||||
t.sendSamples(pendingSamples[:t.cfg.MaxSamplesPerSend])
|
||||
pendingSamples = pendingSamples[t.cfg.MaxSamplesPerSend:]
|
||||
}
|
||||
case <-time.After(t.cfg.BatchSendDeadline):
|
||||
if len(pendingSamples) > 0 {
|
||||
t.sendSamples(pendingSamples)
|
||||
pendingSamples = pendingSamples[:0]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *StorageQueueManager) sendSamples(s model.Samples) {
|
||||
t.sendSemaphore <- true
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
<-t.sendSemaphore
|
||||
}()
|
||||
|
||||
// Samples are sent to the remote storage on a best-effort basis. If a
|
||||
// sample isn't sent correctly the first time, it's simply dropped on the
|
||||
// floor.
|
||||
|
@ -201,44 +273,4 @@ func (t *StorageQueueManager) sendSamples(s model.Samples) {
|
|||
}
|
||||
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s)))
|
||||
t.sendLatency.Observe(duration)
|
||||
}()
|
||||
}
|
||||
|
||||
// Run continuously sends samples to the remote storage.
|
||||
func (t *StorageQueueManager) Run() {
|
||||
defer func() {
|
||||
close(t.drained)
|
||||
}()
|
||||
|
||||
// Send batches of at most maxSamplesPerSend samples to the remote storage.
|
||||
// If we have fewer samples than that, flush them out after a deadline
|
||||
// anyways.
|
||||
for {
|
||||
select {
|
||||
case s, ok := <-t.queue:
|
||||
if !ok {
|
||||
log.Infof("Flushing %d samples to remote storage...", len(t.pendingSamples))
|
||||
t.flush()
|
||||
log.Infof("Done flushing.")
|
||||
return
|
||||
}
|
||||
|
||||
t.pendingSamples = append(t.pendingSamples, s)
|
||||
|
||||
for len(t.pendingSamples) >= maxSamplesPerSend {
|
||||
t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
|
||||
t.pendingSamples = t.pendingSamples[maxSamplesPerSend:]
|
||||
}
|
||||
case <-time.After(batchSendDeadline):
|
||||
t.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush flushes remaining queued samples.
|
||||
func (t *StorageQueueManager) flush() {
|
||||
if len(t.pendingSamples) > 0 {
|
||||
t.sendSamples(t.pendingSamples)
|
||||
}
|
||||
t.pendingSamples = t.pendingSamples[:0]
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
@ -23,28 +24,53 @@ import (
|
|||
)
|
||||
|
||||
type TestStorageClient struct {
|
||||
receivedSamples model.Samples
|
||||
expectedSamples model.Samples
|
||||
receivedSamples map[string]model.Samples
|
||||
expectedSamples map[string]model.Samples
|
||||
wg sync.WaitGroup
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func (c *TestStorageClient) expectSamples(s model.Samples) {
|
||||
c.expectedSamples = append(c.expectedSamples, s...)
|
||||
c.wg.Add(len(s))
|
||||
func NewTestStorageClient() *TestStorageClient {
|
||||
return &TestStorageClient{
|
||||
receivedSamples: map[string]model.Samples{},
|
||||
expectedSamples: map[string]model.Samples{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TestStorageClient) expectSamples(ss model.Samples) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
for _, s := range ss {
|
||||
ts := s.Metric.String()
|
||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], s)
|
||||
}
|
||||
c.wg.Add(len(ss))
|
||||
}
|
||||
|
||||
func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
|
||||
c.wg.Wait()
|
||||
for i, expected := range c.expectedSamples {
|
||||
if !expected.Equal(c.receivedSamples[i]) {
|
||||
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[i])
|
||||
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
for ts, expectedSamples := range c.expectedSamples {
|
||||
for i, expected := range expectedSamples {
|
||||
if !expected.Equal(c.receivedSamples[ts][i]) {
|
||||
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TestStorageClient) Store(s model.Samples) error {
|
||||
c.receivedSamples = append(c.receivedSamples, s...)
|
||||
c.wg.Add(-len(s))
|
||||
func (c *TestStorageClient) Store(ss model.Samples) error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
for _, s := range ss {
|
||||
ts := s.Metric.String()
|
||||
c.receivedSamples[ts] = append(c.receivedSamples[ts], s)
|
||||
}
|
||||
c.wg.Add(-len(ss))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -55,21 +81,24 @@ func (c *TestStorageClient) Name() string {
|
|||
func TestSampleDelivery(t *testing.T) {
|
||||
// Let's create an even number of send batches so we don't run into the
|
||||
// batch timeout case.
|
||||
n := maxSamplesPerSend * 2
|
||||
cfg := defaultConfig
|
||||
n := cfg.QueueCapacity * 2
|
||||
cfg.Shards = 1
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
|
||||
samples = append(samples, &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "test_metric",
|
||||
model.MetricNameLabel: name,
|
||||
},
|
||||
Value: model.SampleValue(i),
|
||||
})
|
||||
}
|
||||
|
||||
c := &TestStorageClient{}
|
||||
c := NewTestStorageClient()
|
||||
c.expectSamples(samples[:len(samples)/2])
|
||||
m := NewStorageQueueManager(c, len(samples)/2)
|
||||
m := NewStorageQueueManager(c, &cfg)
|
||||
|
||||
// These should be received by the client.
|
||||
for _, s := range samples[:len(samples)/2] {
|
||||
|
@ -85,6 +114,39 @@ func TestSampleDelivery(t *testing.T) {
|
|||
c.waitForExpectedSamples(t)
|
||||
}
|
||||
|
||||
func TestSampleDeliveryOrder(t *testing.T) {
|
||||
cfg := defaultConfig
|
||||
ts := 10
|
||||
n := cfg.MaxSamplesPerSend * ts
|
||||
// Ensure we don't drop samples in this test.
|
||||
cfg.QueueCapacity = n
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts))
|
||||
samples = append(samples, &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: name,
|
||||
},
|
||||
Value: model.SampleValue(i),
|
||||
Timestamp: model.Time(i),
|
||||
})
|
||||
}
|
||||
|
||||
c := NewTestStorageClient()
|
||||
c.expectSamples(samples)
|
||||
m := NewStorageQueueManager(c, &cfg)
|
||||
|
||||
// These should be received by the client.
|
||||
for _, s := range samples {
|
||||
m.Append(s)
|
||||
}
|
||||
go m.Run()
|
||||
defer m.Stop()
|
||||
|
||||
c.waitForExpectedSamples(t)
|
||||
}
|
||||
|
||||
// TestBlockingStorageClient is a queue_manager StorageClient which will block
|
||||
// on any calls to Store(), until the `block` channel is closed, at which point
|
||||
// the `numCalls` property will contain a count of how many times Store() was
|
||||
|
@ -121,24 +183,26 @@ func (c *TestBlockingStorageClient) Name() string {
|
|||
|
||||
func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
||||
// Our goal is to fully empty the queue:
|
||||
// `maxSamplesPerSend*maxConcurrentSends` samples should be consumed by the
|
||||
// semaphore-controlled goroutines, and then another `maxSamplesPerSend`
|
||||
// should be consumed by the Run() loop calling sendSample and immediately
|
||||
// blocking.
|
||||
n := maxSamplesPerSend*maxConcurrentSends + maxSamplesPerSend
|
||||
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
||||
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
||||
// should be left on the queue.
|
||||
cfg := defaultConfig
|
||||
n := cfg.MaxSamplesPerSend*cfg.Shards + cfg.MaxSamplesPerSend
|
||||
cfg.QueueCapacity = n
|
||||
|
||||
samples := make(model.Samples, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
|
||||
samples = append(samples, &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "test_metric",
|
||||
model.MetricNameLabel: name,
|
||||
},
|
||||
Value: model.SampleValue(i),
|
||||
})
|
||||
}
|
||||
|
||||
c := NewTestBlockedStorageClient()
|
||||
m := NewStorageQueueManager(c, n)
|
||||
m := NewStorageQueueManager(c, &cfg)
|
||||
|
||||
go m.Run()
|
||||
|
||||
|
@ -151,7 +215,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
|||
m.Append(s)
|
||||
}
|
||||
|
||||
// Wait until the Run() loop drains the queue. If things went right, it
|
||||
// Wait until the runShard() loops drain the queue. If things went right, it
|
||||
// should then immediately block in sendSamples(), but, in case of error,
|
||||
// it would spawn too many goroutines, and thus we'd see more calls to
|
||||
// client.Store()
|
||||
|
@ -163,19 +227,18 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
|||
// draining the queue. We cap the waiting at 1 second -- that should give
|
||||
// plenty of time, and keeps the failure fairly quick if we're not draining
|
||||
// the queue properly.
|
||||
for i := 0; i < 100 && len(m.queue) > 0; i++ {
|
||||
for i := 0; i < 100 && m.queueLen() > 0; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if len(m.queue) > 0 {
|
||||
if m.queueLen() != cfg.MaxSamplesPerSend {
|
||||
t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left",
|
||||
len(m.queue),
|
||||
m.queueLen(),
|
||||
)
|
||||
}
|
||||
|
||||
numCalls := c.NumCalls()
|
||||
if numCalls != maxConcurrentSends {
|
||||
t.Errorf("Saw %d concurrent sends, expected %d", numCalls, maxConcurrentSends)
|
||||
if numCalls != uint64(cfg.Shards) {
|
||||
t.Errorf("Saw %d concurrent sends, expected %d", numCalls, cfg.Shards)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -52,11 +52,11 @@ func New(o *Options) (*Storage, error) {
|
|||
c := graphite.NewClient(
|
||||
o.GraphiteAddress, o.GraphiteTransport,
|
||||
o.StorageTimeout, o.GraphitePrefix)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.OpentsdbURL != "" {
|
||||
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.InfluxdbURL != nil {
|
||||
conf := influx.Config{
|
||||
|
@ -67,7 +67,7 @@ func New(o *Options) (*Storage, error) {
|
|||
}
|
||||
c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
|
||||
prometheus.MustRegister(c)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.Address != "" {
|
||||
c, err := NewClient(o.Address, o.StorageTimeout)
|
||||
|
|
Loading…
Reference in a new issue