Rename StorageQueueManager -> QueueManager

This commit is contained in:
Julius Volz 2017-02-21 21:45:43 +01:00
parent e9476b35d5
commit 2f39dbc8b3
3 changed files with 22 additions and 22 deletions

View file

@ -113,8 +113,8 @@ type StorageClient interface {
Name() string Name() string
} }
// StorageQueueManagerConfig configures a storage queue. // QueueManagerConfig configures a storage queue.
type StorageQueueManagerConfig struct { type QueueManagerConfig struct {
QueueCapacity int // Number of samples to buffer per shard before we start dropping them. QueueCapacity int // Number of samples to buffer per shard before we start dropping them.
Shards int // Number of shards, i.e. amount of concurrency. Shards int // Number of shards, i.e. amount of concurrency.
MaxSamplesPerSend int // Maximum number of samples per send. MaxSamplesPerSend int // Maximum number of samples per send.
@ -124,18 +124,18 @@ type StorageQueueManagerConfig struct {
Client StorageClient Client StorageClient
} }
// StorageQueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient. // indicated by the provided StorageClient.
type StorageQueueManager struct { type QueueManager struct {
cfg StorageQueueManagerConfig cfg QueueManagerConfig
shards []chan *model.Sample shards []chan *model.Sample
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{} done chan struct{}
queueName string queueName string
} }
// NewStorageQueueManager builds a new StorageQueueManager. // NewQueueManager builds a new QueueManager.
func NewStorageQueueManager(cfg StorageQueueManagerConfig) *StorageQueueManager { func NewQueueManager(cfg QueueManagerConfig) *QueueManager {
if cfg.QueueCapacity == 0 { if cfg.QueueCapacity == 0 {
cfg.QueueCapacity = defaultQueueCapacity cfg.QueueCapacity = defaultQueueCapacity
} }
@ -154,7 +154,7 @@ func NewStorageQueueManager(cfg StorageQueueManagerConfig) *StorageQueueManager
shards[i] = make(chan *model.Sample, cfg.QueueCapacity) shards[i] = make(chan *model.Sample, cfg.QueueCapacity)
} }
t := &StorageQueueManager{ t := &QueueManager{
cfg: cfg, cfg: cfg,
shards: shards, shards: shards,
done: make(chan struct{}), done: make(chan struct{}),
@ -169,7 +169,7 @@ func NewStorageQueueManager(cfg StorageQueueManagerConfig) *StorageQueueManager
// Append queues a sample to be sent to the remote storage. It drops the // Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full. // sample on the floor if the queue is full.
// Always returns nil. // Always returns nil.
func (t *StorageQueueManager) Append(s *model.Sample) error { func (t *QueueManager) Append(s *model.Sample) error {
var snew model.Sample var snew model.Sample
snew = *s snew = *s
snew.Metric = s.Metric.Clone() snew.Metric = s.Metric.Clone()
@ -203,13 +203,13 @@ func (t *StorageQueueManager) Append(s *model.Sample) error {
// NeedsThrottling implements storage.SampleAppender. It will always return // NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead // false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling. // of asking for throttling.
func (*StorageQueueManager) NeedsThrottling() bool { func (*QueueManager) NeedsThrottling() bool {
return false return false
} }
// Start the queue manager sending samples to the remote storage. // Start the queue manager sending samples to the remote storage.
// Does not block. // Does not block.
func (t *StorageQueueManager) Start() { func (t *QueueManager) Start() {
for i := 0; i < t.cfg.Shards; i++ { for i := 0; i < t.cfg.Shards; i++ {
go t.runShard(i) go t.runShard(i)
} }
@ -217,7 +217,7 @@ func (t *StorageQueueManager) Start() {
// Stop stops sending samples to the remote storage and waits for pending // Stop stops sending samples to the remote storage and waits for pending
// sends to complete. // sends to complete.
func (t *StorageQueueManager) Stop() { func (t *QueueManager) Stop() {
log.Infof("Stopping remote storage...") log.Infof("Stopping remote storage...")
for _, shard := range t.shards { for _, shard := range t.shards {
close(shard) close(shard)
@ -226,7 +226,7 @@ func (t *StorageQueueManager) Stop() {
log.Info("Remote storage stopped.") log.Info("Remote storage stopped.")
} }
func (t *StorageQueueManager) runShard(i int) { func (t *QueueManager) runShard(i int) {
defer t.wg.Done() defer t.wg.Done()
shard := t.shards[i] shard := t.shards[i]
@ -263,7 +263,7 @@ func (t *StorageQueueManager) runShard(i int) {
} }
} }
func (t *StorageQueueManager) sendSamples(s model.Samples) { func (t *QueueManager) sendSamples(s model.Samples) {
// Samples are sent to the remote storage on a best-effort basis. If a // Samples are sent to the remote storage on a best-effort basis. If a
// sample isn't sent correctly the first time, it's simply dropped on the // sample isn't sent correctly the first time, it's simply dropped on the
// floor. // floor.

View file

@ -97,7 +97,7 @@ func TestSampleDelivery(t *testing.T) {
c := NewTestStorageClient() c := NewTestStorageClient()
c.expectSamples(samples[:len(samples)/2]) c.expectSamples(samples[:len(samples)/2])
m := NewStorageQueueManager(StorageQueueManagerConfig{ m := NewQueueManager(QueueManagerConfig{
Client: c, Client: c,
Shards: 1, Shards: 1,
}) })
@ -134,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient() c := NewTestStorageClient()
c.expectSamples(samples) c.expectSamples(samples)
m := NewStorageQueueManager(StorageQueueManagerConfig{ m := NewQueueManager(QueueManagerConfig{
Client: c, Client: c,
// Ensure we don't drop samples in this test. // Ensure we don't drop samples in this test.
QueueCapacity: n, QueueCapacity: n,
@ -184,7 +184,7 @@ func (c *TestBlockingStorageClient) Name() string {
return "testblockingstorageclient" return "testblockingstorageclient"
} }
func (t *StorageQueueManager) queueLen() int { func (t *QueueManager) queueLen() int {
queueLength := 0 queueLength := 0
for _, shard := range t.shards { for _, shard := range t.shards {
queueLength += len(shard) queueLength += len(shard)
@ -211,7 +211,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
} }
c := NewTestBlockedStorageClient() c := NewTestBlockedStorageClient()
m := NewStorageQueueManager(StorageQueueManagerConfig{ m := NewQueueManager(QueueManagerConfig{
Client: c, Client: c,
QueueCapacity: n, QueueCapacity: n,
}) })
@ -244,7 +244,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
} }
if m.queueLen() != defaultMaxSamplesPerSend { if m.queueLen() != defaultMaxSamplesPerSend {
t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left", t.Fatalf("Failed to drain QueueManager queue, %d elements left",
m.queueLen(), m.queueLen(),
) )
} }

View file

@ -24,7 +24,7 @@ import (
// Storage allows queueing samples for remote writes. // Storage allows queueing samples for remote writes.
type Storage struct { type Storage struct {
mtx sync.RWMutex mtx sync.RWMutex
queues []*StorageQueueManager queues []*QueueManager
} }
// ApplyConfig updates the state as the new config requires. // ApplyConfig updates the state as the new config requires.
@ -32,7 +32,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
newQueues := []*StorageQueueManager{} newQueues := []*QueueManager{}
// TODO: we should only stop & recreate queues which have changes, // TODO: we should only stop & recreate queues which have changes,
// as this can be quite disruptive. // as this can be quite disruptive.
for i, rwConf := range conf.RemoteWriteConfigs { for i, rwConf := range conf.RemoteWriteConfigs {
@ -40,7 +40,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
if err != nil { if err != nil {
return err return err
} }
newQueues = append(newQueues, NewStorageQueueManager(StorageQueueManagerConfig{ newQueues = append(newQueues, NewQueueManager(QueueManagerConfig{
Client: c, Client: c,
ExternalLabels: conf.GlobalConfig.ExternalLabels, ExternalLabels: conf.GlobalConfig.ExternalLabels,
RelabelConfigs: rwConf.WriteRelabelConfigs, RelabelConfigs: rwConf.WriteRelabelConfigs,