Add experimental InfluxDB write support.

This commit is contained in:
Julius Volz 2015-03-30 02:42:04 +02:00
parent 52d89b2173
commit 61fb688dd9
7 changed files with 339 additions and 63 deletions

48
main.go
View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/prometheus/web"
"github.com/prometheus/prometheus/web/api"
@ -50,8 +51,10 @@ var (
persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.")
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
remoteStorageType = flag.String("storage.remote.type", "", "The type of remote storage to use. Valid values: 'opentsdb', 'influxdb'. If this flag is left empty, no remote storage is used.")
opentsdbURL = flag.String("storage.remote.opentsdb-url", "", "The URL of the remote OpenTSDB server to send samples to.")
influxdbURL = flag.String("storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to.")
remoteStorageTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to the remote storage.")
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
@ -73,7 +76,7 @@ type prometheus struct {
targetManager retrieval.TargetManager
notificationHandler *notification.NotificationHandler
storage local.Storage
remoteTSDBQueue *remote.TSDBQueueManager
remoteStorageQueue *remote.StorageQueueManager
webService *web.WebService
@ -119,15 +122,24 @@ func NewPrometheus() *prometheus {
}
var sampleAppender storage.SampleAppender
var remoteTSDBQueue *remote.TSDBQueueManager
if *remoteTSDBUrl == "" {
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
var remoteStorageQueue *remote.StorageQueueManager
if *remoteStorageType == "" {
glog.Warningf("No remote storage implementation selected; not sending any samples to long-term storage")
sampleAppender = memStorage
} else {
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 100*1024)
var c remote.StorageClient
switch *remoteStorageType {
case "opentsdb":
c = opentsdb.NewClient(*opentsdbURL, *remoteStorageTimeout)
case "influxdb":
c = influxdb.NewClient(*influxdbURL, *remoteStorageTimeout)
default:
glog.Fatalf("Invalid flag value for 'storage.remote.type': %s", *remoteStorageType)
}
remoteStorageQueue = remote.NewStorageQueueManager(c, 100*1024)
sampleAppender = storage.Tee{
Appender1: remoteTSDBQueue,
Appender1: remoteStorageQueue,
Appender2: memStorage,
}
}
@ -184,7 +196,7 @@ func NewPrometheus() *prometheus {
targetManager: targetManager,
notificationHandler: notificationHandler,
storage: memStorage,
remoteTSDBQueue: remoteTSDBQueue,
remoteStorageQueue: remoteStorageQueue,
webService: webService,
}
@ -196,8 +208,8 @@ func NewPrometheus() *prometheus {
// down. The method installs an interrupt handler, allowing to trigger a
// shutdown by sending SIGTERM to the process.
func (p *prometheus) Serve() {
if p.remoteTSDBQueue != nil {
go p.remoteTSDBQueue.Run()
if p.remoteStorageQueue != nil {
go p.remoteStorageQueue.Run()
}
go p.ruleManager.Run()
go p.notificationHandler.Run()
@ -227,8 +239,8 @@ func (p *prometheus) Serve() {
glog.Error("Error stopping local storage: ", err)
}
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Stop()
if p.remoteStorageQueue != nil {
p.remoteStorageQueue.Stop()
}
p.notificationHandler.Stop()
@ -239,8 +251,8 @@ func (p *prometheus) Serve() {
func (p *prometheus) Describe(ch chan<- *registry.Desc) {
p.notificationHandler.Describe(ch)
p.storage.Describe(ch)
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Describe(ch)
if p.remoteStorageQueue != nil {
p.remoteStorageQueue.Describe(ch)
}
}
@ -248,8 +260,8 @@ func (p *prometheus) Describe(ch chan<- *registry.Desc) {
func (p *prometheus) Collect(ch chan<- registry.Metric) {
p.notificationHandler.Collect(ch)
p.storage.Collect(ch)
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Collect(ch)
if p.remoteStorageQueue != nil {
p.remoteStorageQueue.Collect(ch)
}
}

View file

@ -0,0 +1,159 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package influxdb
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/url"
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/utility"
)
const (
writeEndpoint = "/write"
contentTypeJSON = "application/json"
)
var (
retentionPolicy = flag.String("storage.remote.influxdb.retention-policy", "default", "The InfluxDB retention policy to use.")
database = flag.String("storage.remote.influxdb.database", "prometheus", "The name of the database to use for storing samples in InfluxDB.")
)
// Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct {
url string
httpClient *http.Client
}
// NewClient creates a new Client.
func NewClient(url string, timeout time.Duration) *Client {
return &Client{
url: url,
httpClient: utility.NewDeadlineClient(timeout),
}
}
// StoreSamplesRequest is used for building a JSON request for storing samples
// in InfluxDB.
type StoreSamplesRequest struct {
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Points []point `json:"points"`
}
// point represents a single InfluxDB measurement.
type point struct {
Timestamp int64 `json:"timestamp"`
Precision string `json:"precision"`
Name clientmodel.LabelValue `json:"name"`
Tags clientmodel.LabelSet `json:"tags"`
Fields fields `json:"fields"`
}
// fields represents the fields/columns sent to InfluxDB for a given measurement.
type fields struct {
Value clientmodel.SampleValue `json:"value"`
}
// tagsFromMetric extracts InfluxDB tags from a Prometheus metric.
func tagsFromMetric(m clientmodel.Metric) clientmodel.LabelSet {
tags := make(clientmodel.LabelSet, len(m)-1)
for l, v := range m {
if l == clientmodel.MetricNameLabel {
continue
}
tags[l] = v
}
return tags
}
// Store sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Store(samples clientmodel.Samples) error {
points := make([]point, 0, len(samples))
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
// TODO(julius): figure out if it's possible to insert special float
// values into InfluxDB somehow.
glog.Warningf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
continue
}
metric := s.Metric[clientmodel.MetricNameLabel]
points = append(points, point{
Timestamp: s.Timestamp.UnixNano(),
Precision: "n",
Name: metric,
Tags: tagsFromMetric(s.Metric),
Fields: fields{
Value: s.Value,
},
})
}
u, err := url.Parse(c.url)
if err != nil {
return err
}
u.Path = writeEndpoint
req := StoreSamplesRequest{
Database: *database,
RetentionPolicy: *retentionPolicy,
Points: points,
}
buf, err := json.Marshal(req)
if err != nil {
return err
}
resp, err := c.httpClient.Post(
u.String(),
contentTypeJSON,
bytes.NewBuffer(buf),
)
if err != nil {
return err
}
defer resp.Body.Close()
// API returns status code 200 for successful writes.
// http://influxdb.com/docs/v0.9/concepts/reading_and_writing_data.html#response
if resp.StatusCode == http.StatusOK {
return nil
}
// API returns error details in the response content in JSON.
buf, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var r map[string]string
if err := json.Unmarshal(buf, &r); err != nil {
return err
}
return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"])
}

View file

@ -0,0 +1,88 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package influxdb
import (
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"testing"
"time"
clientmodel "github.com/prometheus/client_golang/model"
)
func TestClient(t *testing.T) {
samples := clientmodel.Samples{
{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"test_label": "test_label_value1",
},
Timestamp: clientmodel.Timestamp(123456789123),
Value: 1.23,
},
{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "testmetric",
"test_label": "test_label_value2",
},
Timestamp: clientmodel.Timestamp(123456789123),
Value: 5.1234,
},
{
Metric: clientmodel.Metric{
clientmodel.MetricNameLabel: "special_float_value",
},
Timestamp: clientmodel.Timestamp(123456789123),
Value: clientmodel.SampleValue(math.NaN()),
},
}
expectedJSON := `{"database":"prometheus","retentionPolicy":"default","points":[{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value1"},"fields":{"value":"1.23"}},{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value2"},"fields":{"value":"5.1234"}}]}`
server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Fatalf("Unexpected method; expected POST, got %s", r.Method)
}
if r.URL.Path != writeEndpoint {
t.Fatalf("Unexpected path; expected %s, got %s", writeEndpoint, r.URL.Path)
}
ct := r.Header["Content-Type"]
if len(ct) != 1 {
t.Fatalf("Unexpected number of 'Content-Type' headers; got %d, want 1", len(ct))
}
if ct[0] != contentTypeJSON {
t.Fatalf("Unexpected 'Content-type'; expected %s, got %s", contentTypeJSON, ct[0])
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatalf("Error reading body: %s", err)
}
if string(b) != expectedJSON {
t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedJSON, string(b))
}
},
))
defer server.Close()
c := NewClient(server.URL, time.Minute)
if err := c.Store(samples); err != nil {
t.Fatalf("Error sending samples: %s", err)
}
}

View file

@ -1,3 +1,16 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opentsdb
import (
@ -12,6 +25,7 @@ import (
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/utility"
@ -67,7 +81,7 @@ func (c *Client) Store(samples clientmodel.Samples) error {
for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
glog.Warningf("cannot send value %d to OpenTSDB, skipping sample %#v", v, s)
glog.Warningf("cannot send value %f to OpenTSDB, skipping sample %#v", v, s)
continue
}
metric := TagValue(s.Metric[clientmodel.MetricNameLabel])

View file

@ -64,11 +64,11 @@ func TestMarshalStoreSamplesRequest(t *testing.T) {
var unmarshaledRequest StoreSamplesRequest
err = json.Unmarshal(expectedJSON, &unmarshaledRequest)
if err != nil {
t.Fatalf("Unarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err)
t.Fatalf("Unmarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err)
}
if !reflect.DeepEqual(unmarshaledRequest, request) {
t.Errorf(
"Unarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v",
"Unmarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v",
unmarshaledRequest, request,
)
}

View file

@ -23,9 +23,9 @@ import (
)
const (
// The maximum number of concurrent send requests to the TSDB.
// The maximum number of concurrent send requests to the remote storage.
maxConcurrentSends = 10
// The maximum number of samples to fit into a single request to the TSDB.
// The maximum number of samples to fit into a single request to the remote storage.
maxSamplesPerSend = 100
// The deadline after which to send queued samples even if the maximum batch
// size has not been reached.
@ -43,16 +43,16 @@ const (
dropped = "dropped"
)
// TSDBClient defines an interface for sending a batch of samples to an
// external timeseries database (TSDB).
type TSDBClient interface {
// StorageClient defines an interface for sending a batch of samples to an
// external timeseries database.
type StorageClient interface {
Store(clientmodel.Samples) error
}
// TSDBQueueManager manages a queue of samples to be sent to the TSDB indicated
// by the provided TSDBClient.
type TSDBQueueManager struct {
tsdb TSDBClient
// StorageQueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient.
type StorageQueueManager struct {
tsdb StorageClient
queue chan *clientmodel.Sample
pendingSamples clientmodel.Samples
sendSemaphore chan bool
@ -65,9 +65,9 @@ type TSDBQueueManager struct {
queueCapacity prometheus.Metric
}
// NewTSDBQueueManager builds a new TSDBQueueManager.
func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
return &TSDBQueueManager{
// NewStorageQueueManager builds a new StorageQueueManager.
func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager {
return &StorageQueueManager{
tsdb: tsdb,
queue: make(chan *clientmodel.Sample, queueCapacity),
sendSemaphore: make(chan bool, maxConcurrentSends),
@ -78,7 +78,7 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_samples_total",
Help: "Total number of processed samples to be sent to remote TSDB.",
Help: "Total number of processed samples to be sent to remote storage.",
},
[]string{result},
),
@ -86,24 +86,24 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_latency_milliseconds",
Help: "Latency quantiles for sending sample batches to the remote TSDB.",
Help: "Latency quantiles for sending sample batches to the remote storage.",
}),
sendErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_errors_total",
Help: "Total number of errors sending sample batches to the remote TSDB.",
Help: "Total number of errors sending sample batches to the remote storage.",
}),
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_length",
Help: "The number of processed samples queued to be sent to the remote TSDB.",
Help: "The number of processed samples queued to be sent to the remote storage.",
}),
queueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"),
"The capacity of the queue of samples to be sent to the remote TSDB.",
"The capacity of the queue of samples to be sent to the remote storage.",
nil, nil,
),
prometheus.GaugeValue,
@ -112,20 +112,21 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager {
}
}
// Append queues a sample to be sent to the TSDB. It drops the sample on the
// floor if the queue is full. It implements storage.SampleAppender.
func (t *TSDBQueueManager) Append(s *clientmodel.Sample) {
// Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full. It implements
// storage.SampleAppender.
func (t *StorageQueueManager) Append(s *clientmodel.Sample) {
select {
case t.queue <- s:
default:
t.samplesCount.WithLabelValues(dropped).Inc()
glog.Warning("TSDB queue full, discarding sample.")
glog.Warning("Remote storage queue full, discarding sample.")
}
}
// Stop stops sending samples to the TSDB and waits for pending sends to
// complete.
func (t *TSDBQueueManager) Stop() {
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func (t *StorageQueueManager) Stop() {
glog.Infof("Stopping remote storage...")
close(t.queue)
<-t.drained
@ -136,7 +137,7 @@ func (t *TSDBQueueManager) Stop() {
}
// Describe implements prometheus.Collector.
func (t *TSDBQueueManager) Describe(ch chan<- *prometheus.Desc) {
func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) {
t.samplesCount.Describe(ch)
t.sendLatency.Describe(ch)
ch <- t.queueLength.Desc()
@ -144,7 +145,7 @@ func (t *TSDBQueueManager) Describe(ch chan<- *prometheus.Desc) {
}
// Collect implements prometheus.Collector.
func (t *TSDBQueueManager) Collect(ch chan<- prometheus.Metric) {
func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
t.samplesCount.Collect(ch)
t.sendLatency.Collect(ch)
t.queueLength.Set(float64(len(t.queue)))
@ -152,21 +153,22 @@ func (t *TSDBQueueManager) Collect(ch chan<- prometheus.Metric) {
ch <- t.queueCapacity
}
func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
func (t *StorageQueueManager) sendSamples(s clientmodel.Samples) {
t.sendSemaphore <- true
defer func() {
<-t.sendSemaphore
}()
// Samples are sent to the TSDB on a best-effort basis. If a sample isn't
// sent correctly the first time, it's simply dropped on the floor.
// Samples are sent to the remote storage on a best-effort basis. If a
// sample isn't sent correctly the first time, it's simply dropped on the
// floor.
begin := time.Now()
err := t.tsdb.Store(s)
duration := time.Since(begin) / time.Millisecond
labelValue := success
if err != nil {
glog.Warningf("error sending %d samples to TSDB: %s", len(s), err)
glog.Warningf("error sending %d samples to remote storage: %s", len(s), err)
labelValue = failure
t.sendErrors.Inc()
}
@ -174,19 +176,20 @@ func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) {
t.sendLatency.Observe(float64(duration))
}
// Run continuously sends samples to the TSDB.
func (t *TSDBQueueManager) Run() {
// Run continuously sends samples to the remote storage.
func (t *StorageQueueManager) Run() {
defer func() {
close(t.drained)
}()
// Send batches of at most maxSamplesPerSend samples to the TSDB. If we
// have fewer samples than that, flush them out after a deadline anyways.
// Send batches of at most maxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
for {
select {
case s, ok := <-t.queue:
if !ok {
glog.Infof("Flushing %d samples to OpenTSDB...", len(t.pendingSamples))
glog.Infof("Flushing %d samples to remote storage...", len(t.pendingSamples))
t.flush()
glog.Infof("Done flushing.")
return
@ -205,7 +208,7 @@ func (t *TSDBQueueManager) Run() {
}
// Flush flushes remaining queued samples.
func (t *TSDBQueueManager) flush() {
func (t *StorageQueueManager) flush() {
if len(t.pendingSamples) > 0 {
go t.sendSamples(t.pendingSamples)
}

View file

@ -20,18 +20,18 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
)
type TestTSDBClient struct {
type TestStorageClient struct {
receivedSamples clientmodel.Samples
expectedSamples clientmodel.Samples
wg sync.WaitGroup
}
func (c *TestTSDBClient) expectSamples(s clientmodel.Samples) {
func (c *TestStorageClient) expectSamples(s clientmodel.Samples) {
c.expectedSamples = append(c.expectedSamples, s...)
c.wg.Add(len(s))
}
func (c *TestTSDBClient) waitForExpectedSamples(t *testing.T) {
func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
c.wg.Wait()
for i, expected := range c.expectedSamples {
if !expected.Equal(c.receivedSamples[i]) {
@ -40,7 +40,7 @@ func (c *TestTSDBClient) waitForExpectedSamples(t *testing.T) {
}
}
func (c *TestTSDBClient) Store(s clientmodel.Samples) error {
func (c *TestStorageClient) Store(s clientmodel.Samples) error {
c.receivedSamples = append(c.receivedSamples, s...)
c.wg.Add(-len(s))
return nil
@ -61,9 +61,9 @@ func TestSampleDelivery(t *testing.T) {
})
}
c := &TestTSDBClient{}
c := &TestStorageClient{}
c.expectSamples(samples[:len(samples)/2])
m := NewTSDBQueueManager(c, len(samples)/2)
m := NewStorageQueueManager(c, len(samples)/2)
// These should be received by the client.
for _, s := range samples[:len(samples)/2] {