Merge "Add optional sample replication to OpenTSDB."

This commit is contained in:
Julius Volz 2014-01-08 17:45:08 +01:00 committed by Gerrit Code Review
commit d5ef0c64dc
6 changed files with 473 additions and 9 deletions

41
main.go
View file

@ -30,6 +30,8 @@ import (
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/prometheus/web"
"github.com/prometheus/prometheus/web/api"
)
@ -43,6 +45,9 @@ var (
alertmanagerUrl = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.")
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
samplesQueueCapacity = flag.Int("storage.queue.samplesCapacity", 4096, "The size of the unwritten samples queue.")
diskAppendQueueCapacity = flag.Int("storage.queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.")
memoryAppendQueueCapacity = flag.Int("storage.queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.")
@ -84,10 +89,11 @@ type prometheus struct {
unwrittenSamples chan *extraction.Result
ruleManager rules.RuleManager
targetManager retrieval.TargetManager
notifications chan notification.NotificationReqs
storage *metric.TieredStorage
ruleManager rules.RuleManager
targetManager retrieval.TargetManager
notifications chan notification.NotificationReqs
storage *metric.TieredStorage
remoteTSDBQueue *remote.TSDBQueueManager
curationState metric.CurationStateUpdater
}
@ -186,6 +192,10 @@ func (p *prometheus) close() {
p.storage.Close()
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Close()
}
close(p.notifications)
close(p.stopBackgroundOperations)
}
@ -212,6 +222,15 @@ func main() {
glog.Fatal("Error opening storage: ", err)
}
var remoteTSDBQueue *remote.TSDBQueueManager = nil
if *remoteTSDBUrl == "" {
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
} else {
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512)
go remoteTSDBQueue.Run()
}
unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity)
ingester := &retrieval.MergeLabelsIngester{
Labels: conf.GlobalLabels(),
@ -298,10 +317,11 @@ func main() {
stopBackgroundOperations: make(chan bool, 1),
ruleManager: ruleManager,
targetManager: targetManager,
notifications: notifications,
storage: ts,
ruleManager: ruleManager,
targetManager: targetManager,
notifications: notifications,
storage: ts,
remoteTSDBQueue: remoteTSDBQueue,
}
defer prometheus.close()
@ -368,8 +388,11 @@ func main() {
// TODO(all): Migrate this into prometheus.serve().
for block := range unwrittenSamples {
if block.Err == nil {
if block.Err == nil && len(block.Samples) > 0 {
ts.AppendSamples(block.Samples)
if remoteTSDBQueue != nil {
remoteTSDBQueue.Queue(block.Samples)
}
}
}
}

View file

@ -0,0 +1,55 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
result = "result"
success = "success"
failure = "failure"
dropped = "dropped"
facet = "facet"
occupancy = "occupancy"
capacity = "capacity"
)
var (
samplesCount = prometheus.NewCounter()
sendLatency = prometheus.NewDefaultHistogram()
queueSize = prometheus.NewGauge()
)
func recordOutcome(duration time.Duration, sampleCount int, err error) {
labels := map[string]string{result: success}
if err != nil {
labels[result] = failure
}
samplesCount.IncrementBy(labels, float64(sampleCount))
ms := float64(duration / time.Millisecond)
sendLatency.Add(labels, ms)
}
func init() {
prometheus.Register("prometheus_remote_tsdb_sent_samples_total", "Total number of samples processed to be sent to remote TSDB.", prometheus.NilLabels, samplesCount)
prometheus.Register("prometheus_remote_tsdb_latency_ms", "Latency quantiles for sending samples to the remote TSDB in milliseconds.", prometheus.NilLabels, sendLatency)
prometheus.Register("prometheus_remote_tsdb_queue_size_total", "The size and capacity of the queue of samples to be sent to the remote TSDB.", prometheus.NilLabels, queueSize)
}

View file

@ -0,0 +1,120 @@
package opentsdb
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/utility"
)
const (
putEndpoint = "/api/put"
contentTypeJson = "application/json"
)
var (
illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_\-./]`)
)
// Client allows sending batches of Prometheus samples to OpenTSDB.
type Client struct {
url string
httpClient *http.Client
}
// Create a new Client.
func NewClient(url string, timeout time.Duration) *Client {
return &Client{
url: url,
httpClient: utility.NewDeadlineClient(timeout),
}
}
// StoreSamplesRequest is used for building a JSON request for storing samples
// via the OpenTSDB.
type StoreSamplesRequest struct {
Metric string `json:"metric"`
Timestamp int64 `json:"timestamp"`
Value clientmodel.SampleValue `json:"value"`
Tags map[string]string `json:"tags"`
}
// Escape Prometheus label values to valid tag values for OpenTSDB.
func escapeTagValue(l clientmodel.LabelValue) string {
return illegalCharsRE.ReplaceAllString(string(l), "_")
}
// Translate Prometheus metric into OpenTSDB tags.
func tagsFromMetric(m clientmodel.Metric) map[string]string {
tags := make(map[string]string, len(m)-1)
for l, v := range m {
if l == clientmodel.MetricNameLabel {
continue
}
tags[string(l)] = escapeTagValue(v)
}
return tags
}
// Send a batch of samples to OpenTSDB via its HTTP API.
func (c *Client) Store(samples clientmodel.Samples) error {
reqs := make([]StoreSamplesRequest, 0, len(samples))
for _, s := range samples {
metric := escapeTagValue(s.Metric[clientmodel.MetricNameLabel])
reqs = append(reqs, StoreSamplesRequest{
Metric: metric,
Timestamp: s.Timestamp.Unix(),
Value: s.Value,
Tags: tagsFromMetric(s.Metric),
})
}
u, err := url.Parse(c.url)
if err != nil {
return err
}
u.Path = putEndpoint
buf, err := json.Marshal(reqs)
if err != nil {
return err
}
resp, err := c.httpClient.Post(
u.String(),
contentTypeJson,
bytes.NewBuffer(buf),
)
if err != nil {
return err
}
defer resp.Body.Close()
// API returns status code 204 for successful writes.
// http://opentsdb.net/docs/build/html/api_http/put.html
if resp.StatusCode == http.StatusNoContent {
return nil
}
// API returns status code 400 on error, encoding error details in the
// response content in JSON.
buf, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var r map[string]int
if err := json.Unmarshal(buf, &r); err != nil {
return err
}
return fmt.Errorf("Failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
}

View file

@ -0,0 +1,41 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opentsdb
import (
"testing"
clientmodel "github.com/prometheus/client_golang/model"
)
func TestTagsFromMetric(t *testing.T) {
input := clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"test:label": "test:value",
"many_chars": "abc!ABC:012-3!45ö67~89./",
}
expected := map[string]string{
"test:label": "test_value",
"many_chars": "abc_ABC_012-3_45_67_89./",
}
actual := tagsFromMetric(input)
if len(actual) != len(expected) {
t.Fatalf("Expected %v, got %v", expected, actual)
}
for k, v := range expected {
if v != actual[k] {
t.Fatalf("Expected %s => %s, got %s => %s", k, v, k, actual[k])
}
}
}

View file

@ -0,0 +1,148 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
)
const (
// The maximum number of concurrent send requests to the TSDB.
maxConcurrentSends = 10
// The maximum number of samples to fit into a single request to the TSDB.
maxSamplesPerSend = 100
// The deadline after which to send queued samples even if the maximum batch
// size has not been reached.
batchSendDeadline = 5 * time.Second
)
// TSDBClient defines an interface for sending a batch of samples to an
// external timeseries database (TSDB).
type TSDBClient interface {
Store(clientmodel.Samples) error
}
// TSDBQueueManager manages a queue of samples to be sent to the TSDB indicated
// by the provided TSDBClient.
type TSDBQueueManager struct {
tsdb TSDBClient
queue chan clientmodel.Samples
pendingSamples clientmodel.Samples
sendSemaphore chan bool
drained chan bool
}
// Build a new TSDBQueueManager.
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
return &TSDBQueueManager{
tsdb: tsdb,
queue: make(chan clientmodel.Samples, queueCapacity),
sendSemaphore: make(chan bool, maxConcurrentSends),
drained: make(chan bool),
}
}
// Queue a sample batch to be sent to the TSDB. This drops the most recently
// queued samples on the floor if the queue is full.
func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
select {
case t.queue <- s:
default:
samplesCount.IncrementBy(map[string]string{result: dropped}, float64(len(s)))
glog.Warningf("TSDB queue full, discarding %d samples", len(s))
}
}
func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
t.sendSemaphore <- true
defer func() {
<-t.sendSemaphore
}()
// Samples are sent to the TSDB on a best-effort basis. If a sample isn't
// sent correctly the first time, it's simply dropped on the floor.
begin := time.Now()
err := t.tsdb.Store(s)
recordOutcome(time.Since(begin), len(s), err)
if err != nil {
glog.Warningf("error sending %d samples to TSDB: %s", len(s), err)
}
}
// Report notification queue occupancy and capacity.
func (t *TSDBQueueManager) reportQueues() {
queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue)))
queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue)))
}
// Continuously send samples to the TSDB.
func (t *TSDBQueueManager) Run() {
defer func() {
close(t.drained)
}()
queueReportTicker := time.NewTicker(time.Second)
go func() {
for _ = range queueReportTicker.C {
t.reportQueues()
}
}()
defer queueReportTicker.Stop()
// Send batches of at most maxSamplesPerSend samples to the TSDB. If we
// have fewer samples than that, flush them out after a deadline anyways.
for {
select {
case s, ok := <-t.queue:
if !ok {
glog.Infof("Flushing %d samples to OpenTSDB...", len(t.pendingSamples))
t.flush()
glog.Infof("Done flushing.")
return
}
t.pendingSamples = append(t.pendingSamples, s...)
for len(t.pendingSamples) >= maxSamplesPerSend {
go t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
t.pendingSamples = t.pendingSamples[maxSamplesPerSend:]
}
case <-time.After(batchSendDeadline):
t.flush()
}
}
}
// Flush remaining queued samples.
func (t *TSDBQueueManager) flush() {
if len(t.pendingSamples) > 0 {
go t.sendSamples(t.pendingSamples)
}
t.pendingSamples = t.pendingSamples[:0]
}
// Stop sending samples to the TSDB and wait for pending sends to complete.
func (t *TSDBQueueManager) Close() {
glog.Infof("TSDB queue manager shutting down...")
close(t.queue)
<-t.drained
for i := 0; i < maxConcurrentSends; i++ {
t.sendSemaphore <- true
}
}

View file

@ -0,0 +1,77 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sync"
"testing"
clientmodel "github.com/prometheus/client_golang/model"
)
type TestTSDBClient struct {
receivedSamples clientmodel.Samples
expectedSamples clientmodel.Samples
wg sync.WaitGroup
}
func (c *TestTSDBClient) expectSamples(s clientmodel.Samples) {
c.expectedSamples = append(c.expectedSamples, s...)
c.wg.Add(len(s))
}
func (c *TestTSDBClient) waitForExpectedSamples(t *testing.T) {
c.wg.Wait()
for i, expected := range c.expectedSamples {
if !expected.Equal(c.receivedSamples[i]) {
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[i])
}
}
}
func (c *TestTSDBClient) Store(s clientmodel.Samples) error {
c.receivedSamples = append(c.receivedSamples, s...)
c.wg.Add(-len(s))
return nil
}
func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := maxSamplesPerSend * 2
samples := make(clientmodel.Samples, 0, n)
for i := 0; i < n; i++ {
samples = append(samples, &clientmodel.Sample{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "test_metric",
},
Value: clientmodel.SampleValue(i),
})
}
c := &TestTSDBClient{}
c.expectSamples(samples[:len(samples)/2])
m := NewTSDBQueueManager(c, 1)
// These should be received by the client.
m.Queue(samples[:len(samples)/2])
// These will be dropped because the queue is full.
m.Queue(samples[len(samples)/2:])
go m.Run()
defer m.Close()
c.waitForExpectedSamples(t)
}