prometheus/storage/remote/queue_manager.go
Bjoern Rabenstein 6bc083f38b Major code cleanup in storage.
- Mostly docstring fixed/additions.
  (Please review these carefully, since most of them were missing, I
  had to guess them from an outsider's perspective. (Which on the
  other hand proves how desperately required many of these docstrings
  are.))

- Removed all uses of new(...) to meet our own style guide (draft).

- Fixed all other 'go vet' and 'golint' issues (except those that are
  not fixable (i.e. caused by bugs in or by design of 'go vet' and
  'golint')).

- Some trivial refactorings, like reorder functions, minor renames, ...

- Some slightly less trivial refactoring, mostly to reduce code
  duplication by embedding types instead of writing many explicit
  forwarders.

- Cleaned up the interface structure a bit. (Most significant probably
  the removal of the View-like methods from MetricPersistenc. Now they
  are only in View and not duplicated anymore.)

- Removed dead code. (Probably not all of it, but it's a first
  step...)

- Fixed a leftover in storage/metric/end_to_end_test.go (that made
  some parts of the code never execute (incidentally, those parts
  were broken (and I fixed them, too))).

Change-Id: Ibcac069940d118a88f783314f5b4595dce6641d5
2014-02-27 15:22:37 +01:00

150 lines
4.3 KiB
Go

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
)
const (
// The maximum number of concurrent send requests to the TSDB.
maxConcurrentSends = 10
// The maximum number of samples to fit into a single request to the TSDB.
maxSamplesPerSend = 100
// The deadline after which to send queued samples even if the maximum batch
// size has not been reached.
batchSendDeadline = 5 * time.Second
)
// TSDBClient defines an interface for sending a batch of samples to an
// external timeseries database (TSDB).
type TSDBClient interface {
Store(clientmodel.Samples) error
}
// TSDBQueueManager manages a queue of samples to be sent to the TSDB indicated
// by the provided TSDBClient.
type TSDBQueueManager struct {
tsdb TSDBClient
queue chan clientmodel.Samples
pendingSamples clientmodel.Samples
sendSemaphore chan bool
drained chan bool
}
// NewTSDBQueueManager builds a new TSDBQueueManager.
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
return &TSDBQueueManager{
tsdb: tsdb,
queue: make(chan clientmodel.Samples, queueCapacity),
sendSemaphore: make(chan bool, maxConcurrentSends),
drained: make(chan bool),
}
}
// Queue queues a sample batch to be sent to the TSDB. It drops the most
// recently queued samples on the floor if the queue is full.
func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
select {
case t.queue <- s:
default:
samplesCount.IncrementBy(map[string]string{result: dropped}, float64(len(s)))
glog.Warningf("TSDB queue full, discarding %d samples", len(s))
}
}
func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
t.sendSemaphore <- true
defer func() {
<-t.sendSemaphore
}()
// Samples are sent to the TSDB on a best-effort basis. If a sample isn't
// sent correctly the first time, it's simply dropped on the floor.
begin := time.Now()
err := t.tsdb.Store(s)
recordOutcome(time.Since(begin), len(s), err)
if err != nil {
glog.Warningf("error sending %d samples to TSDB: %s", len(s), err)
}
}
// reportQueues reports notification queue occupancy and capacity.
func (t *TSDBQueueManager) reportQueues() {
queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue)))
queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue)))
}
// Run continuously sends samples to the TSDB.
func (t *TSDBQueueManager) Run() {
defer func() {
close(t.drained)
}()
queueReportTicker := time.NewTicker(time.Second)
go func() {
for _ = range queueReportTicker.C {
t.reportQueues()
}
}()
defer queueReportTicker.Stop()
// Send batches of at most maxSamplesPerSend samples to the TSDB. If we
// have fewer samples than that, flush them out after a deadline anyways.
for {
select {
case s, ok := <-t.queue:
if !ok {
glog.Infof("Flushing %d samples to OpenTSDB...", len(t.pendingSamples))
t.flush()
glog.Infof("Done flushing.")
return
}
t.pendingSamples = append(t.pendingSamples, s...)
for len(t.pendingSamples) >= maxSamplesPerSend {
go t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
t.pendingSamples = t.pendingSamples[maxSamplesPerSend:]
}
case <-time.After(batchSendDeadline):
t.flush()
}
}
}
// Flush flushes remaining queued samples.
func (t *TSDBQueueManager) flush() {
if len(t.pendingSamples) > 0 {
go t.sendSamples(t.pendingSamples)
}
t.pendingSamples = t.pendingSamples[:0]
}
// Close stops sending samples to the TSDB and waits for pending sends to
// complete.
func (t *TSDBQueueManager) Close() {
glog.Infof("TSDB queue manager shutting down...")
close(t.queue)
<-t.drained
for i := 0; i < maxConcurrentSends; i++ {
t.sendSemaphore <- true
}
}