diff --git a/main.go b/main.go index 50e6aa061..4b0705d56 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/opentsdb" "github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web/api" @@ -51,8 +52,9 @@ var ( persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.") - remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") - remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") + opentsdbURL = flag.String("storage.remote.opentsdb-url", "", "The URL of the remote OpenTSDB server to send samples to. None, if empty.") + influxdbURL = flag.String("storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to. None, if empty.") + remoteStorageTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to the remote storage.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") @@ -76,7 +78,7 @@ type prometheus struct { targetManager retrieval.TargetManager notificationHandler *notification.NotificationHandler storage local.Storage - remoteTSDBQueue *remote.TSDBQueueManager + remoteStorageQueues []*remote.StorageQueueManager webService *web.WebService @@ -122,17 +124,27 @@ func NewPrometheus() *prometheus { } var sampleAppender storage.SampleAppender - var remoteTSDBQueue *remote.TSDBQueueManager - if *remoteTSDBUrl == "" { - glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage") + var remoteStorageQueues []*remote.StorageQueueManager + if *opentsdbURL == "" && *influxdbURL == "" { + glog.Warningf("No remote storage URLs provided; not sending any samples to long-term storage") sampleAppender = memStorage } else { - openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout) - remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 100*1024) - sampleAppender = storage.Tee{ - Appender1: remoteTSDBQueue, - Appender2: memStorage, + fanout := storage.Fanout{memStorage} + + addRemoteStorage := func(c remote.StorageClient) { + qm := remote.NewStorageQueueManager(c, 100*1024) + fanout = append(fanout, qm) + remoteStorageQueues = append(remoteStorageQueues, qm) } + + if *opentsdbURL != "" { + addRemoteStorage(opentsdb.NewClient(*opentsdbURL, *remoteStorageTimeout)) + } + if *influxdbURL != "" { + addRemoteStorage(influxdb.NewClient(*influxdbURL, *remoteStorageTimeout)) + } + + sampleAppender = fanout } targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels()) @@ -196,7 +208,7 @@ func NewPrometheus() *prometheus { targetManager: targetManager, notificationHandler: notificationHandler, storage: memStorage, - remoteTSDBQueue: remoteTSDBQueue, + remoteStorageQueues: remoteStorageQueues, webService: webService, } @@ -208,8 +220,8 @@ func NewPrometheus() *prometheus { // down. The method installs an interrupt handler, allowing to trigger a // shutdown by sending SIGTERM to the process. func (p *prometheus) Serve() { - if p.remoteTSDBQueue != nil { - go p.remoteTSDBQueue.Run() + for _, q := range p.remoteStorageQueues { + go q.Run() } go p.ruleManager.Run() go p.notificationHandler.Run() @@ -239,8 +251,8 @@ func (p *prometheus) Serve() { glog.Error("Error stopping local storage: ", err) } - if p.remoteTSDBQueue != nil { - p.remoteTSDBQueue.Stop() + for _, q := range p.remoteStorageQueues { + q.Stop() } p.notificationHandler.Stop() @@ -251,8 +263,8 @@ func (p *prometheus) Serve() { func (p *prometheus) Describe(ch chan<- *registry.Desc) { p.notificationHandler.Describe(ch) p.storage.Describe(ch) - if p.remoteTSDBQueue != nil { - p.remoteTSDBQueue.Describe(ch) + for _, q := range p.remoteStorageQueues { + q.Describe(ch) } } @@ -260,8 +272,8 @@ func (p *prometheus) Describe(ch chan<- *registry.Desc) { func (p *prometheus) Collect(ch chan<- registry.Metric) { p.notificationHandler.Collect(ch) p.storage.Collect(ch) - if p.remoteTSDBQueue != nil { - p.remoteTSDBQueue.Collect(ch) + for _, q := range p.remoteStorageQueues { + q.Collect(ch) } } diff --git a/storage/remote/influxdb/client.go b/storage/remote/influxdb/client.go new file mode 100644 index 000000000..ac2acad33 --- /dev/null +++ b/storage/remote/influxdb/client.go @@ -0,0 +1,164 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "math" + "net/http" + "net/url" + "time" + + "github.com/golang/glog" + + clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/utility" +) + +const ( + writeEndpoint = "/write" + contentTypeJSON = "application/json" +) + +var ( + retentionPolicy = flag.String("storage.remote.influxdb.retention-policy", "default", "The InfluxDB retention policy to use.") + database = flag.String("storage.remote.influxdb.database", "prometheus", "The name of the database to use for storing samples in InfluxDB.") +) + +// Client allows sending batches of Prometheus samples to InfluxDB. +type Client struct { + url string + httpClient *http.Client +} + +// NewClient creates a new Client. +func NewClient(url string, timeout time.Duration) *Client { + return &Client{ + url: url, + httpClient: utility.NewDeadlineClient(timeout), + } +} + +// StoreSamplesRequest is used for building a JSON request for storing samples +// in InfluxDB. +type StoreSamplesRequest struct { + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Points []point `json:"points"` +} + +// point represents a single InfluxDB measurement. +type point struct { + Timestamp int64 `json:"timestamp"` + Precision string `json:"precision"` + Name clientmodel.LabelValue `json:"name"` + Tags clientmodel.LabelSet `json:"tags"` + Fields fields `json:"fields"` +} + +// fields represents the fields/columns sent to InfluxDB for a given measurement. +type fields struct { + Value clientmodel.SampleValue `json:"value"` +} + +// tagsFromMetric extracts InfluxDB tags from a Prometheus metric. +func tagsFromMetric(m clientmodel.Metric) clientmodel.LabelSet { + tags := make(clientmodel.LabelSet, len(m)-1) + for l, v := range m { + if l == clientmodel.MetricNameLabel { + continue + } + tags[l] = v + } + return tags +} + +// Store sends a batch of samples to InfluxDB via its HTTP API. +func (c *Client) Store(samples clientmodel.Samples) error { + points := make([]point, 0, len(samples)) + for _, s := range samples { + v := float64(s.Value) + if math.IsNaN(v) || math.IsInf(v, 0) { + // TODO(julius): figure out if it's possible to insert special float + // values into InfluxDB somehow. + glog.Warningf("cannot send value %f to InfluxDB, skipping sample %#v", v, s) + continue + } + metric := s.Metric[clientmodel.MetricNameLabel] + points = append(points, point{ + Timestamp: s.Timestamp.UnixNano(), + Precision: "n", + Name: metric, + Tags: tagsFromMetric(s.Metric), + Fields: fields{ + Value: s.Value, + }, + }) + } + + u, err := url.Parse(c.url) + if err != nil { + return err + } + + u.Path = writeEndpoint + + req := StoreSamplesRequest{ + Database: *database, + RetentionPolicy: *retentionPolicy, + Points: points, + } + buf, err := json.Marshal(req) + if err != nil { + return err + } + + resp, err := c.httpClient.Post( + u.String(), + contentTypeJSON, + bytes.NewBuffer(buf), + ) + if err != nil { + return err + } + defer resp.Body.Close() + + // API returns status code 200 for successful writes. + // http://influxdb.com/docs/v0.9/concepts/reading_and_writing_data.html#response + if resp.StatusCode == http.StatusOK { + return nil + } + + // API returns error details in the response content in JSON. + buf, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + var r map[string]string + if err := json.Unmarshal(buf, &r); err != nil { + return err + } + return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"]) +} + +// Name identifies the client as an InfluxDB client. +func (c Client) Name() string { + return "influxdb" +} diff --git a/storage/remote/influxdb/client_test.go b/storage/remote/influxdb/client_test.go new file mode 100644 index 000000000..ae8329826 --- /dev/null +++ b/storage/remote/influxdb/client_test.go @@ -0,0 +1,88 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package influxdb + +import ( + "io/ioutil" + "math" + "net/http" + "net/http/httptest" + "testing" + "time" + + clientmodel "github.com/prometheus/client_golang/model" +) + +func TestClient(t *testing.T) { + samples := clientmodel.Samples{ + { + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "test_label": "test_label_value1", + }, + Timestamp: clientmodel.Timestamp(123456789123), + Value: 1.23, + }, + { + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "test_label": "test_label_value2", + }, + Timestamp: clientmodel.Timestamp(123456789123), + Value: 5.1234, + }, + { + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "special_float_value", + }, + Timestamp: clientmodel.Timestamp(123456789123), + Value: clientmodel.SampleValue(math.NaN()), + }, + } + + expectedJSON := `{"database":"prometheus","retentionPolicy":"default","points":[{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value1"},"fields":{"value":"1.23"}},{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value2"},"fields":{"value":"5.1234"}}]}` + + server := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Fatalf("Unexpected method; expected POST, got %s", r.Method) + } + if r.URL.Path != writeEndpoint { + t.Fatalf("Unexpected path; expected %s, got %s", writeEndpoint, r.URL.Path) + } + ct := r.Header["Content-Type"] + if len(ct) != 1 { + t.Fatalf("Unexpected number of 'Content-Type' headers; got %d, want 1", len(ct)) + } + if ct[0] != contentTypeJSON { + t.Fatalf("Unexpected 'Content-type'; expected %s, got %s", contentTypeJSON, ct[0]) + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatalf("Error reading body: %s", err) + } + + if string(b) != expectedJSON { + t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedJSON, string(b)) + } + }, + )) + defer server.Close() + + c := NewClient(server.URL, time.Minute) + + if err := c.Store(samples); err != nil { + t.Fatalf("Error sending samples: %s", err) + } +} diff --git a/storage/remote/opentsdb/client.go b/storage/remote/opentsdb/client.go index ecfa7cfcb..84e50977b 100644 --- a/storage/remote/opentsdb/client.go +++ b/storage/remote/opentsdb/client.go @@ -1,3 +1,16 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package opentsdb import ( @@ -12,6 +25,7 @@ import ( "time" "github.com/golang/glog" + clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/utility" @@ -67,7 +81,7 @@ func (c *Client) Store(samples clientmodel.Samples) error { for _, s := range samples { v := float64(s.Value) if math.IsNaN(v) || math.IsInf(v, 0) { - glog.Warningf("cannot send value %d to OpenTSDB, skipping sample %#v", v, s) + glog.Warningf("cannot send value %f to OpenTSDB, skipping sample %#v", v, s) continue } metric := TagValue(s.Metric[clientmodel.MetricNameLabel]) @@ -120,3 +134,8 @@ func (c *Client) Store(samples clientmodel.Samples) error { } return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) } + +// Name identifies the client as an OpenTSDB client. +func (c Client) Name() string { + return "opentsdb" +} diff --git a/storage/remote/opentsdb/client_test.go b/storage/remote/opentsdb/client_test.go index 0e222a4c6..65b31f4f2 100644 --- a/storage/remote/opentsdb/client_test.go +++ b/storage/remote/opentsdb/client_test.go @@ -64,11 +64,11 @@ func TestMarshalStoreSamplesRequest(t *testing.T) { var unmarshaledRequest StoreSamplesRequest err = json.Unmarshal(expectedJSON, &unmarshaledRequest) if err != nil { - t.Fatalf("Unarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err) + t.Fatalf("Unmarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err) } if !reflect.DeepEqual(unmarshaledRequest, request) { t.Errorf( - "Unarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v", + "Unmarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v", unmarshaledRequest, request, ) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 475b4b106..35f6105cf 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -23,9 +23,9 @@ import ( ) const ( - // The maximum number of concurrent send requests to the TSDB. + // The maximum number of concurrent send requests to the remote storage. maxConcurrentSends = 10 - // The maximum number of samples to fit into a single request to the TSDB. + // The maximum number of samples to fit into a single request to the remote storage. maxSamplesPerSend = 100 // The deadline after which to send queued samples even if the maximum batch // size has not been reached. @@ -43,16 +43,19 @@ const ( dropped = "dropped" ) -// TSDBClient defines an interface for sending a batch of samples to an -// external timeseries database (TSDB). -type TSDBClient interface { +// StorageClient defines an interface for sending a batch of samples to an +// external timeseries database. +type StorageClient interface { + // Store stores the given samples in the remote storage. Store(clientmodel.Samples) error + // Name identifies the remote storage implementation. + Name() string } -// TSDBQueueManager manages a queue of samples to be sent to the TSDB indicated -// by the provided TSDBClient. -type TSDBQueueManager struct { - tsdb TSDBClient +// StorageQueueManager manages a queue of samples to be sent to the Storage +// indicated by the provided StorageClient. +type StorageQueueManager struct { + tsdb StorageClient queue chan *clientmodel.Sample pendingSamples clientmodel.Samples sendSemaphore chan bool @@ -65,9 +68,13 @@ type TSDBQueueManager struct { queueCapacity prometheus.Metric } -// NewTSDBQueueManager builds a new TSDBQueueManager. -func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { - return &TSDBQueueManager{ +// NewStorageQueueManager builds a new StorageQueueManager. +func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager { + constLabels := prometheus.Labels{ + "type": tsdb.Name(), + } + + return &StorageQueueManager{ tsdb: tsdb, queue: make(chan *clientmodel.Sample, queueCapacity), sendSemaphore: make(chan bool, maxConcurrentSends), @@ -75,36 +82,41 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { samplesCount: prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_samples_total", - Help: "Total number of processed samples to be sent to remote TSDB.", + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_samples_total", + Help: "Total number of processed samples to be sent to remote storage.", + ConstLabels: constLabels, }, []string{result}, ), sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_latency_milliseconds", - Help: "Latency quantiles for sending sample batches to the remote TSDB.", + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_latency_milliseconds", + Help: "Latency quantiles for sending sample batches to the remote storage.", + ConstLabels: constLabels, }), sendErrors: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "sent_errors_total", - Help: "Total number of errors sending sample batches to the remote TSDB.", + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_errors_total", + Help: "Total number of errors sending sample batches to the remote storage.", + ConstLabels: constLabels, }), queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "queue_length", - Help: "The number of processed samples queued to be sent to the remote TSDB.", + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_length", + Help: "The number of processed samples queued to be sent to the remote storage.", + ConstLabels: constLabels, }), queueCapacity: prometheus.MustNewConstMetric( prometheus.NewDesc( prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), - "The capacity of the queue of samples to be sent to the remote TSDB.", - nil, nil, + "The capacity of the queue of samples to be sent to the remote storage.", + nil, + constLabels, ), prometheus.GaugeValue, float64(queueCapacity), @@ -112,20 +124,21 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { } } -// Append queues a sample to be sent to the TSDB. It drops the sample on the -// floor if the queue is full. It implements storage.SampleAppender. -func (t *TSDBQueueManager) Append(s *clientmodel.Sample) { +// Append queues a sample to be sent to the remote storage. It drops the +// sample on the floor if the queue is full. It implements +// storage.SampleAppender. +func (t *StorageQueueManager) Append(s *clientmodel.Sample) { select { case t.queue <- s: default: t.samplesCount.WithLabelValues(dropped).Inc() - glog.Warning("TSDB queue full, discarding sample.") + glog.Warning("Remote storage queue full, discarding sample.") } } -// Stop stops sending samples to the TSDB and waits for pending sends to -// complete. -func (t *TSDBQueueManager) Stop() { +// Stop stops sending samples to the remote storage and waits for pending +// sends to complete. +func (t *StorageQueueManager) Stop() { glog.Infof("Stopping remote storage...") close(t.queue) <-t.drained @@ -136,7 +149,7 @@ func (t *TSDBQueueManager) Stop() { } // Describe implements prometheus.Collector. -func (t *TSDBQueueManager) Describe(ch chan<- *prometheus.Desc) { +func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) { t.samplesCount.Describe(ch) t.sendLatency.Describe(ch) ch <- t.queueLength.Desc() @@ -144,7 +157,7 @@ func (t *TSDBQueueManager) Describe(ch chan<- *prometheus.Desc) { } // Collect implements prometheus.Collector. -func (t *TSDBQueueManager) Collect(ch chan<- prometheus.Metric) { +func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) { t.samplesCount.Collect(ch) t.sendLatency.Collect(ch) t.queueLength.Set(float64(len(t.queue))) @@ -152,21 +165,22 @@ func (t *TSDBQueueManager) Collect(ch chan<- prometheus.Metric) { ch <- t.queueCapacity } -func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) { +func (t *StorageQueueManager) sendSamples(s clientmodel.Samples) { t.sendSemaphore <- true defer func() { <-t.sendSemaphore }() - // Samples are sent to the TSDB on a best-effort basis. If a sample isn't - // sent correctly the first time, it's simply dropped on the floor. + // Samples are sent to the remote storage on a best-effort basis. If a + // sample isn't sent correctly the first time, it's simply dropped on the + // floor. begin := time.Now() err := t.tsdb.Store(s) duration := time.Since(begin) / time.Millisecond labelValue := success if err != nil { - glog.Warningf("error sending %d samples to TSDB: %s", len(s), err) + glog.Warningf("error sending %d samples to remote storage: %s", len(s), err) labelValue = failure t.sendErrors.Inc() } @@ -174,19 +188,20 @@ func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) { t.sendLatency.Observe(float64(duration)) } -// Run continuously sends samples to the TSDB. -func (t *TSDBQueueManager) Run() { +// Run continuously sends samples to the remote storage. +func (t *StorageQueueManager) Run() { defer func() { close(t.drained) }() - // Send batches of at most maxSamplesPerSend samples to the TSDB. If we - // have fewer samples than that, flush them out after a deadline anyways. + // Send batches of at most maxSamplesPerSend samples to the remote storage. + // If we have fewer samples than that, flush them out after a deadline + // anyways. for { select { case s, ok := <-t.queue: if !ok { - glog.Infof("Flushing %d samples to OpenTSDB...", len(t.pendingSamples)) + glog.Infof("Flushing %d samples to remote storage...", len(t.pendingSamples)) t.flush() glog.Infof("Done flushing.") return @@ -205,7 +220,7 @@ func (t *TSDBQueueManager) Run() { } // Flush flushes remaining queued samples. -func (t *TSDBQueueManager) flush() { +func (t *StorageQueueManager) flush() { if len(t.pendingSamples) > 0 { go t.sendSamples(t.pendingSamples) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 344d77ce4..b308ca433 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -20,18 +20,18 @@ import ( clientmodel "github.com/prometheus/client_golang/model" ) -type TestTSDBClient struct { +type TestStorageClient struct { receivedSamples clientmodel.Samples expectedSamples clientmodel.Samples wg sync.WaitGroup } -func (c *TestTSDBClient) expectSamples(s clientmodel.Samples) { +func (c *TestStorageClient) expectSamples(s clientmodel.Samples) { c.expectedSamples = append(c.expectedSamples, s...) c.wg.Add(len(s)) } -func (c *TestTSDBClient) waitForExpectedSamples(t *testing.T) { +func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { c.wg.Wait() for i, expected := range c.expectedSamples { if !expected.Equal(c.receivedSamples[i]) { @@ -40,12 +40,16 @@ func (c *TestTSDBClient) waitForExpectedSamples(t *testing.T) { } } -func (c *TestTSDBClient) Store(s clientmodel.Samples) error { +func (c *TestStorageClient) Store(s clientmodel.Samples) error { c.receivedSamples = append(c.receivedSamples, s...) c.wg.Add(-len(s)) return nil } +func (c TestStorageClient) Name() string { + return "teststorageclient" +} + func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. @@ -61,9 +65,9 @@ func TestSampleDelivery(t *testing.T) { }) } - c := &TestTSDBClient{} + c := &TestStorageClient{} c.expectSamples(samples[:len(samples)/2]) - m := NewTSDBQueueManager(c, len(samples)/2) + m := NewStorageQueueManager(c, len(samples)/2) // These should be received by the client. for _, s := range samples[:len(samples)/2] { diff --git a/storage/storage.go b/storage/storage.go index 4984f9d68..68ac6f649 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -23,16 +23,15 @@ type SampleAppender interface { Append(*clientmodel.Sample) } -// Tee is a SampleAppender that appends every sample to two other +// Fanout is a SampleAppender that appends every sample to a list of other // SampleAppenders. -type Tee struct { - Appender1, Appender2 SampleAppender -} +type Fanout []SampleAppender -// Append implements SampleAppender. It appends the provided sample first -// to Appender1, then to Appender2, waiting for each to return before -// proceeding. -func (t Tee) Append(s *clientmodel.Sample) { - t.Appender1.Append(s) - t.Appender2.Append(s) +// Append implements SampleAppender. It appends the provided sample to all +// SampleAppenders in the Fanout slice and waits for each append to complete +// before proceeding with the next. +func (f Fanout) Append(s *clientmodel.Sample) { + for _, a := range f { + a.Append(s) + } }