From 2dda5775e318cdbd325dc63cd8596b7451c20dcc Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 10 May 2017 10:44:13 +0100 Subject: [PATCH] Initial port of remote storage to v2. --- cmd/prometheus/main.go | 18 +- storage/fanout.go | 374 +++++++++++++++ storage/remote/client.go | 200 ++++++++ storage/remote/client_test.go | 79 ++++ storage/remote/ewma.go | 68 +++ storage/remote/queue_manager.go | 512 +++++++++++++++++++++ storage/remote/queue_manager_test.go | 253 ++++++++++ storage/remote/read.go | 256 +++++++++++ storage/remote/read_test.go | 94 ++++ storage/remote/remote.pb.go | 312 +++++++++++++ storage/remote/remote.proto | 68 +++ storage/remote/storage.go | 101 ++++ storage/remote/write.go | 57 +++ vendor/golang.org/x/time/LICENSE | 27 ++ vendor/golang.org/x/time/PATENTS | 22 + vendor/golang.org/x/time/rate/rate.go | 380 +++++++++++++++ vendor/golang.org/x/time/rate/rate_go16.go | 21 + vendor/golang.org/x/time/rate/rate_go17.go | 21 + vendor/vendor.json | 6 + 19 files changed, 2861 insertions(+), 8 deletions(-) create mode 100644 storage/fanout.go create mode 100644 storage/remote/client.go create mode 100644 storage/remote/client_test.go create mode 100644 storage/remote/ewma.go create mode 100644 storage/remote/queue_manager.go create mode 100644 storage/remote/queue_manager_test.go create mode 100644 storage/remote/read.go create mode 100644 storage/remote/read_test.go create mode 100644 storage/remote/remote.pb.go create mode 100644 storage/remote/remote.proto create mode 100644 storage/remote/storage.go create mode 100644 storage/remote/write.go create mode 100644 vendor/golang.org/x/time/LICENSE create mode 100644 vendor/golang.org/x/time/PATENTS create mode 100644 vendor/golang.org/x/time/rate/rate.go create mode 100644 vendor/golang.org/x/time/rate/rate_go16.go create mode 100644 vendor/golang.org/x/time/rate/rate_go17.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a8db09a05..9eb6d3b69 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -40,6 +40,8 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/web" ) @@ -221,20 +223,20 @@ func main() { } logger.Infoln("tsdb started") - // remoteStorage := &remote.Storage{} - // sampleAppender = append(sampleAppender, remoteStorage) - // reloadables = append(reloadables, remoteStorage) + remoteStorage := &remote.Storage{} + reloadables = append(reloadables, remoteStorage) + fanoutStorage := storage.NewFanout(tsdb.Adapter(localStorage), remoteStorage) cfg.queryEngine.Logger = logger var ( notifier = notifier.New(&cfg.notifier, logger) - targetManager = retrieval.NewTargetManager(tsdb.Adapter(localStorage), logger) - queryEngine = promql.NewEngine(tsdb.Adapter(localStorage), &cfg.queryEngine) + targetManager = retrieval.NewTargetManager(fanoutStorage, logger) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ctx, cancelCtx = context.WithCancel(context.Background()) ) ruleManager := rules.NewManager(&rules.ManagerOptions{ - Appendable: tsdb.Adapter(localStorage), + Appendable: fanoutStorage, Notifier: notifier, QueryEngine: queryEngine, Context: ctx, @@ -296,8 +298,8 @@ func main() { // Start all components. The order is NOT arbitrary. defer func() { - if err := localStorage.Close(); err != nil { - logger.Errorln("Error stopping storage:", err) + if err := fanoutStorage.Close(); err != nil { + log.Errorln("Error stopping storage:", err) } }() diff --git a/storage/fanout.go b/storage/fanout.go new file mode 100644 index 000000000..56e698073 --- /dev/null +++ b/storage/fanout.go @@ -0,0 +1,374 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package remote + +package storage + +import ( + "container/heap" + "strings" + + "github.com/prometheus/prometheus/pkg/labels" +) + +type fanout struct { + storages []Storage +} + +// NewFanout returns a new fan-out Storage, which proxies reads and writes +// through to multiple underlying storages. +func NewFanout(storages ...Storage) Storage { + return &fanout{ + storages: storages, + } +} + +func (f *fanout) Querier(mint, maxt int64) (Querier, error) { + queriers := mergeQuerier{ + queriers: make([]Querier, 0, len(f.storages)), + } + for _, storage := range f.storages { + querier, err := storage.Querier(mint, maxt) + if err != nil { + queriers.Close() + return nil, err + } + queriers.queriers = append(queriers.queriers, querier) + } + return &queriers, nil +} + +func (f *fanout) Appender() (Appender, error) { + appenders := make([]Appender, 0, len(f.storages)) + for _, storage := range f.storages { + appender, err := storage.Appender() + if err != nil { + return nil, err + } + appenders = append(appenders, appender) + } + return &fanoutAppender{ + appenders: appenders, + }, nil +} + +// Close closes the storage and all its underlying resources. +func (f *fanout) Close() error { + // TODO return multiple errors? + var lastErr error + for _, storage := range f.storages { + if err := storage.Close(); err != nil { + lastErr = err + } + } + return lastErr +} + +// fanoutAppender implements Appender. +type fanoutAppender struct { + appenders []Appender +} + +func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error) { + for _, appender := range f.appenders { + if _, err := appender.Add(l, t, v); err != nil { + return "", err + } + } + return "", nil +} + +func (f *fanoutAppender) AddFast(ref string, t int64, v float64) error { + // TODO this is a cheat, and causes us to fall back to slow path even for local writes. + return ErrNotFound +} + +func (f *fanoutAppender) Commit() error { + for _, appender := range f.appenders { + if err := appender.Commit(); err != nil { + return err + } + } + return nil +} + +func (f *fanoutAppender) Rollback() error { + for _, appender := range f.appenders { + if err := appender.Rollback(); err != nil { + return err + } + } + return nil +} + +// mergeQuerier implements Querier. +type mergeQuerier struct { + queriers []Querier +} + +func NewMergeQuerier(queriers []Querier) Querier { + return &mergeQuerier{ + queriers: queriers, + } +} + +// Select returns a set of series that matches the given label matchers. +func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet { + seriesSets := make([]SeriesSet, 0, len(q.queriers)) + for _, querier := range q.queriers { + seriesSets = append(seriesSets, querier.Select(matchers...)) + } + return newMergeSeriesSet(seriesSets) +} + +// LabelValues returns all potential values for a label name. +func (q *mergeQuerier) LabelValues(name string) ([]string, error) { + var results [][]string + for _, querier := range q.queriers { + values, err := querier.LabelValues(name) + if err != nil { + return nil, err + } + results = append(results, values) + } + return mergeStringSlices(results), nil +} + +func mergeStringSlices(ss [][]string) []string { + switch len(ss) { + case 0: + return nil + case 1: + return ss[0] + case 2: + return mergeTwoStringSlices(ss[0], ss[1]) + default: + halfway := len(ss) / 2 + return mergeTwoStringSlices( + mergeStringSlices(ss[:halfway]), + mergeStringSlices(ss[halfway:]), + ) + } +} + +func mergeTwoStringSlices(a, b []string) []string { + i, j := 0, 0 + result := make([]string, 0, len(a)+len(b)) + for i < len(a) && j < len(b) { + switch strings.Compare(a[i], b[j]) { + case 0: + result = append(result, a[i]) + i++ + j++ + case 1: + result = append(result, a[i]) + i++ + case -1: + result = append(result, b[j]) + j++ + } + } + copy(result, a[i:]) + copy(result, b[j:]) + return result +} + +// Close releases the resources of the Querier. +func (q *mergeQuerier) Close() error { + // TODO return multiple errors? + var lastErr error + for _, querier := range q.queriers { + if err := querier.Close(); err != nil { + lastErr = err + } + } + return lastErr +} + +// mergeSeriesSet implements SeriesSet +type mergeSeriesSet struct { + currentLabels labels.Labels + currentSets []SeriesSet + sets seriesSetHeap +} + +func newMergeSeriesSet(sets []SeriesSet) SeriesSet { + // Sets need to be pre-advanced, so we can introspect the label of the + // series under the cursor. + var h seriesSetHeap + for _, set := range sets { + if set.Next() { + heap.Push(&h, set) + } + } + return &mergeSeriesSet{ + sets: h, + } +} + +func (c *mergeSeriesSet) Next() bool { + // Firstly advance all the current series sets. If any of them have run out + // we can drop them, otherwise they should be inserted back into the heap. + for _, set := range c.currentSets { + if set.Next() { + heap.Push(&c.sets, set) + } + } + if len(c.sets) == 0 { + return false + } + + // Now, pop items of the heap that have equal label sets. + c.currentSets = nil + c.currentLabels = c.sets[0].At().Labels() + for len(c.sets) > 0 && labels.Equal(c.currentLabels, c.sets[0].At().Labels()) { + set := heap.Pop(&c.sets).(SeriesSet) + c.currentSets = append(c.currentSets, set) + } + return true +} + +func (c *mergeSeriesSet) At() Series { + series := []Series{} + for _, seriesSet := range c.currentSets { + series = append(series, seriesSet.At()) + } + return &mergeSeries{ + labels: c.currentLabels, + series: series, + } +} + +func (c *mergeSeriesSet) Err() error { + for _, set := range c.sets { + if err := set.Err(); err != nil { + return err + } + } + return nil +} + +type seriesSetHeap []SeriesSet + +func (h seriesSetHeap) Len() int { return len(h) } +func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h seriesSetHeap) Less(i, j int) bool { + a, b := h[i].At().Labels(), h[j].At().Labels() + return labels.Compare(a, b) < 0 +} + +func (h *seriesSetHeap) Push(x interface{}) { + *h = append(*h, x.(SeriesSet)) +} + +func (h *seriesSetHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type mergeSeries struct { + labels labels.Labels + series []Series +} + +func (m *mergeSeries) Labels() labels.Labels { + return m.labels +} + +func (m *mergeSeries) Iterator() SeriesIterator { + iterators := make([]SeriesIterator, 0, len(m.series)) + for _, s := range m.series { + iterators = append(iterators, s.Iterator()) + } + return &mergeIterator{ + iterators: iterators, + } +} + +type mergeIterator struct { + iterators []SeriesIterator + h seriesIteratorHeap +} + +func newMergeIterator(iterators []SeriesIterator) SeriesIterator { + return &mergeIterator{ + iterators: iterators, + h: nil, + } +} + +func (c *mergeIterator) Seek(t int64) bool { + c.h = seriesIteratorHeap{} + for _, iter := range c.iterators { + if iter.Seek(t) { + heap.Push(&c.h, iter) + } + } + return len(c.h) > 0 +} + +func (c *mergeIterator) At() (t int64, v float64) { + // TODO do I need to dedupe or just merge? + return c.h[0].At() +} + +func (c *mergeIterator) Next() bool { + // Detect the case where Next is called before At + if c.h == nil { + panic("Next() called before Seek()") + } + + if len(c.h) == 0 { + return false + } + iter := heap.Pop(&c.h).(SeriesIterator) + if iter.Next() { + heap.Push(&c.h, iter) + } + return len(c.h) > 0 +} + +func (c *mergeIterator) Err() error { + for _, iter := range c.iterators { + if err := iter.Err(); err != nil { + return err + } + } + return nil +} + +type seriesIteratorHeap []SeriesIterator + +func (h seriesIteratorHeap) Len() int { return len(h) } +func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h seriesIteratorHeap) Less(i, j int) bool { + at, _ := h[i].At() + bt, _ := h[j].At() + return at < bt +} + +func (h *seriesIteratorHeap) Push(x interface{}) { + *h = append(*h, x.(SeriesIterator)) +} + +func (h *seriesIteratorHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/storage/remote/client.go b/storage/remote/client.go new file mode 100644 index 000000000..f3a7dee69 --- /dev/null +++ b/storage/remote/client.go @@ -0,0 +1,200 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/httputil" +) + +const maxErrMsgLen = 256 + +// Client allows reading and writing from/to a remote HTTP endpoint. +type Client struct { + index int // Used to differentiate metrics. + url *config.URL + client *http.Client + timeout time.Duration +} + +type clientConfig struct { + url *config.URL + timeout model.Duration + httpClientConfig config.HTTPClientConfig +} + +// NewClient creates a new Client. +func NewClient(index int, conf *clientConfig) (*Client, error) { + httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig) + if err != nil { + return nil, err + } + + return &Client{ + index: index, + url: conf.url, + client: httpClient, + timeout: time.Duration(conf.timeout), + }, nil +} + +type recoverableError struct { + error +} + +// Store sends a batch of samples to the HTTP endpoint. +func (c *Client) Store(samples model.Samples) error { + req := &WriteRequest{ + Timeseries: make([]*TimeSeries, 0, len(samples)), + } + for _, s := range samples { + ts := &TimeSeries{ + Labels: make([]*LabelPair, 0, len(s.Metric)), + } + for k, v := range s.Metric { + ts.Labels = append(ts.Labels, + &LabelPair{ + Name: string(k), + Value: string(v), + }) + } + ts.Samples = []*Sample{ + { + Value: float64(s.Value), + TimestampMs: int64(s.Timestamp), + }, + } + req.Timeseries = append(req.Timeseries, ts) + } + + data, err := proto.Marshal(req) + if err != nil { + return err + } + + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) + if err != nil { + // Errors from NewRequest are from unparseable URLs, so are not + // recoverable. + return err + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) + if err != nil { + // Errors from client.Do are from (for example) network errors, so are + // recoverable. + return recoverableError{err} + } + defer httpResp.Body.Close() + + if httpResp.StatusCode/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) + } + if httpResp.StatusCode/100 == 5 { + return recoverableError{err} + } + return err +} + +// Name identifies the client. +func (c Client) Name() string { + return fmt.Sprintf("%d:%s", c.index, c.url) +} + +// Read reads from a remote endpoint. +func (c *Client) Read(ctx context.Context, from, through int64, matchers []*LabelMatcher) ([]*TimeSeries, error) { + req := &ReadRequest{ + // TODO: Support batching multiple queries into one read request, + // as the protobuf interface allows for it. + Queries: []*Query{{ + StartTimestampMs: from, + EndTimestampMs: through, + Matchers: matchers, + }}, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, fmt.Errorf("unable to marshal read request: %v", err) + } + + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) + if err != nil { + return nil, fmt.Errorf("unable to create request: %v", err) + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) + if err != nil { + return nil, fmt.Errorf("error sending request: %v", err) + } + defer httpResp.Body.Close() + if httpResp.StatusCode/100 != 2 { + return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status) + } + + compressed, err = ioutil.ReadAll(httpResp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response: %v", err) + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, fmt.Errorf("error reading response: %v", err) + } + + var resp ReadResponse + err = proto.Unmarshal(uncompressed, &resp) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal response body: %v", err) + } + + if len(resp.Results) != len(req.Queries) { + return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) + } + + return resp.Results[0].Timeseries, nil +} diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go new file mode 100644 index 000000000..32a2ae320 --- /dev/null +++ b/storage/remote/client_test.go @@ -0,0 +1,79 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package remote + +package remote + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +var longErrMessage = strings.Repeat("error message", maxErrMsgLen) + +func TestStoreHTTPErrorHandling(t *testing.T) { + tests := []struct { + code int + err error + }{ + { + code: 200, + err: nil, + }, + { + code: 300, + err: fmt.Errorf("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]), + }, + { + code: 404, + err: fmt.Errorf("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]), + }, + { + code: 500, + err: recoverableError{fmt.Errorf("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])}, + }, + } + + for i, test := range tests { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, longErrMessage, test.code) + }), + ) + + serverURL, err := url.Parse(server.URL) + if err != nil { + panic(err) + } + + c, err := NewClient(0, &clientConfig{ + url: &config.URL{serverURL}, + timeout: model.Duration(time.Second), + }) + + err = c.Store(nil) + if !reflect.DeepEqual(err, test.err) { + t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) + } + + server.Close() + } +} diff --git a/storage/remote/ewma.go b/storage/remote/ewma.go new file mode 100644 index 000000000..82b6dd101 --- /dev/null +++ b/storage/remote/ewma.go @@ -0,0 +1,68 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "sync" + "sync/atomic" + "time" +) + +// ewmaRate tracks an exponentially weighted moving average of a per-second rate. +type ewmaRate struct { + newEvents int64 + alpha float64 + interval time.Duration + lastRate float64 + init bool + mutex sync.Mutex +} + +// newEWMARate always allocates a new ewmaRate, as this guarantees the atomically +// accessed int64 will be aligned on ARM. See prometheus#2666. +func newEWMARate(alpha float64, interval time.Duration) *ewmaRate { + return &ewmaRate{ + alpha: alpha, + interval: interval, + } +} + +// rate returns the per-second rate. +func (r *ewmaRate) rate() float64 { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.lastRate +} + +// tick assumes to be called every r.interval. +func (r *ewmaRate) tick() { + newEvents := atomic.LoadInt64(&r.newEvents) + atomic.AddInt64(&r.newEvents, -newEvents) + instantRate := float64(newEvents) / r.interval.Seconds() + + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.init { + r.lastRate += r.alpha * (instantRate - r.lastRate) + } else { + r.init = true + r.lastRate = instantRate + } +} + +// inc counts one event. +func (r *ewmaRate) incr(incr int64) { + atomic.AddInt64(&r.newEvents, incr) +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go new file mode 100644 index 000000000..6813566a4 --- /dev/null +++ b/storage/remote/queue_manager.go @@ -0,0 +1,512 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "math" + "sync" + "time" + + "golang.org/x/time/rate" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/relabel" +) + +// String constants for instrumentation. +const ( + namespace = "prometheus" + subsystem = "remote_storage" + queue = "queue" + + // We track samples in/out and how long pushes take using an Exponentially + // Weighted Moving Average. + ewmaWeight = 0.2 + shardUpdateDuration = 10 * time.Second + + // Allow 30% too many shards before scaling down. + shardToleranceFraction = 0.3 + + // Limit to 1 log event every 10s + logRateLimit = 0.1 + logBurst = 10 +) + +var ( + succeededSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "succeeded_samples_total", + Help: "Total number of samples successfully sent to remote storage.", + }, + []string{queue}, + ) + failedSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_samples_total", + Help: "Total number of samples which failed on send to remote storage.", + }, + []string{queue}, + ) + droppedSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dropped_samples_total", + Help: "Total number of samples which were dropped due to the queue being full.", + }, + []string{queue}, + ) + sentBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + Buckets: prometheus.DefBuckets, + }, + []string{queue}, + ) + queueLength = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_length", + Help: "The number of processed samples queued to be sent to the remote storage.", + }, + []string{queue}, + ) + queueCapacity = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_capacity", + Help: "The capacity of the queue of samples to be sent to the remote storage.", + }, + []string{queue}, + ) + numShards = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards", + Help: "The number of shards used for parallel sending to the remote storage.", + }, + []string{queue}, + ) +) + +func init() { + prometheus.MustRegister(succeededSamplesTotal) + prometheus.MustRegister(failedSamplesTotal) + prometheus.MustRegister(droppedSamplesTotal) + prometheus.MustRegister(sentBatchDuration) + prometheus.MustRegister(queueLength) + prometheus.MustRegister(queueCapacity) + prometheus.MustRegister(numShards) +} + +// QueueManagerConfig is the configuration for the queue used to write to remote +// storage. +type QueueManagerConfig struct { + // Number of samples to buffer per shard before we start dropping them. + QueueCapacity int + // Max number of shards, i.e. amount of concurrency. + MaxShards int + // Maximum number of samples per send. + MaxSamplesPerSend int + // Maximum time sample will wait in buffer. + BatchSendDeadline time.Duration + // Max number of times to retry a batch on recoverable errors. + MaxRetries int + // On recoverable errors, backoff exponentially. + MinBackoff time.Duration + MaxBackoff time.Duration +} + +// defaultQueueManagerConfig is the default remote queue configuration. +var defaultQueueManagerConfig = QueueManagerConfig{ + // With a maximum of 1000 shards, assuming an average of 100ms remote write + // time and 100 samples per batch, we will be able to push 1M samples/s. + MaxShards: 1000, + MaxSamplesPerSend: 100, + + // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At + // 1000 shards, this will buffer 100M samples total. + QueueCapacity: 100 * 1000, + BatchSendDeadline: 5 * time.Second, + + // Max number of times to retry a batch on recoverable errors. + MaxRetries: 10, + MinBackoff: 30 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, +} + +// StorageClient defines an interface for sending a batch of samples to an +// external timeseries database. +type StorageClient interface { + // Store stores the given samples in the remote storage. + Store(model.Samples) error + // Name identifies the remote storage implementation. + Name() string +} + +// QueueManager manages a queue of samples to be sent to the Storage +// indicated by the provided StorageClient. +type QueueManager struct { + cfg QueueManagerConfig + externalLabels model.LabelSet + relabelConfigs []*config.RelabelConfig + client StorageClient + queueName string + logLimiter *rate.Limiter + + shardsMtx sync.Mutex + shards *shards + numShards int + reshardChan chan int + quit chan struct{} + wg sync.WaitGroup + + samplesIn, samplesOut, samplesOutDuration *ewmaRate + integralAccumulator float64 +} + +// NewQueueManager builds a new QueueManager. +func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { + t := &QueueManager{ + cfg: cfg, + externalLabels: externalLabels, + relabelConfigs: relabelConfigs, + client: client, + queueName: client.Name(), + + logLimiter: rate.NewLimiter(logRateLimit, logBurst), + numShards: 1, + reshardChan: make(chan int), + quit: make(chan struct{}), + + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), + } + t.shards = t.newShards(t.numShards) + numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) + queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) + + return t +} + +// Append queues a sample to be sent to the remote storage. It drops the +// sample on the floor if the queue is full. +// Always returns nil. +func (t *QueueManager) Append(s *model.Sample) error { + var snew model.Sample + snew = *s + snew.Metric = s.Metric.Clone() + + for ln, lv := range t.externalLabels { + if _, ok := s.Metric[ln]; !ok { + snew.Metric[ln] = lv + } + } + + snew.Metric = model.Metric( + relabel.Process(model.LabelSet(snew.Metric), t.relabelConfigs...)) + + if snew.Metric == nil { + return nil + } + + t.shardsMtx.Lock() + enqueued := t.shards.enqueue(&snew) + t.shardsMtx.Unlock() + + if enqueued { + queueLength.WithLabelValues(t.queueName).Inc() + } else { + droppedSamplesTotal.WithLabelValues(t.queueName).Inc() + if t.logLimiter.Allow() { + log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.") + } + } + return nil +} + +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (*QueueManager) NeedsThrottling() bool { + return false +} + +// Start the queue manager sending samples to the remote storage. +// Does not block. +func (t *QueueManager) Start() { + t.wg.Add(2) + go t.updateShardsLoop() + go t.reshardLoop() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.start() +} + +// Stop stops sending samples to the remote storage and waits for pending +// sends to complete. +func (t *QueueManager) Stop() { + log.Infof("Stopping remote storage...") + close(t.quit) + t.wg.Wait() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.stop() + log.Info("Remote storage stopped.") +} + +func (t *QueueManager) updateShardsLoop() { + defer t.wg.Done() + + ticker := time.Tick(shardUpdateDuration) + for { + select { + case <-ticker: + t.calculateDesiredShards() + case <-t.quit: + return + } + } +} + +func (t *QueueManager) calculateDesiredShards() { + t.samplesIn.tick() + t.samplesOut.tick() + t.samplesOutDuration.tick() + + // We use the number of incoming samples as a prediction of how much work we + // will need to do next iteration. We add to this any pending samples + // (received - send) so we can catch up with any backlog. We use the average + // outgoing batch latency to work out how many shards we need. + var ( + samplesIn = t.samplesIn.rate() + samplesOut = t.samplesOut.rate() + samplesPending = samplesIn - samplesOut + samplesOutDuration = t.samplesOutDuration.rate() + ) + + // We use an integral accumulator, like in a PID, to help dampen oscillation. + t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1) + + if samplesOut <= 0 { + return + } + + var ( + timePerSample = samplesOutDuration / samplesOut + desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second) + ) + log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f", + samplesIn, samplesOut, samplesPending, desiredShards) + + // Changes in the number of shards must be greater than shardToleranceFraction. + var ( + lowerBound = float64(t.numShards) * (1. - shardToleranceFraction) + upperBound = float64(t.numShards) * (1. + shardToleranceFraction) + ) + log.Debugf("QueueManager.updateShardsLoop %f <= %f <= %f", lowerBound, desiredShards, upperBound) + if lowerBound <= desiredShards && desiredShards <= upperBound { + return + } + + numShards := int(math.Ceil(desiredShards)) + if numShards > t.cfg.MaxShards { + numShards = t.cfg.MaxShards + } + if numShards == t.numShards { + return + } + + // Resharding can take some time, and we want this loop + // to stay close to shardUpdateDuration. + select { + case t.reshardChan <- numShards: + log.Infof("Remote storage resharding from %d to %d shards.", t.numShards, numShards) + t.numShards = numShards + default: + log.Infof("Currently resharding, skipping.") + } +} + +func (t *QueueManager) reshardLoop() { + defer t.wg.Done() + + for { + select { + case numShards := <-t.reshardChan: + t.reshard(numShards) + case <-t.quit: + return + } + } +} + +func (t *QueueManager) reshard(n int) { + numShards.WithLabelValues(t.queueName).Set(float64(n)) + + t.shardsMtx.Lock() + newShards := t.newShards(n) + oldShards := t.shards + t.shards = newShards + t.shardsMtx.Unlock() + + oldShards.stop() + + // We start the newShards after we have stopped (the therefore completely + // flushed) the oldShards, to guarantee we only every deliver samples in + // order. + newShards.start() +} + +type shards struct { + qm *QueueManager + queues []chan *model.Sample + done chan struct{} + wg sync.WaitGroup +} + +func (t *QueueManager) newShards(numShards int) *shards { + queues := make([]chan *model.Sample, numShards) + for i := 0; i < numShards; i++ { + queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) + } + s := &shards{ + qm: t, + queues: queues, + done: make(chan struct{}), + } + s.wg.Add(numShards) + return s +} + +func (s *shards) len() int { + return len(s.queues) +} + +func (s *shards) start() { + for i := 0; i < len(s.queues); i++ { + go s.runShard(i) + } +} + +func (s *shards) stop() { + for _, shard := range s.queues { + close(shard) + } + s.wg.Wait() +} + +func (s *shards) enqueue(sample *model.Sample) bool { + s.qm.samplesIn.incr(1) + + fp := sample.Metric.FastFingerprint() + shard := uint64(fp) % uint64(len(s.queues)) + + select { + case s.queues[shard] <- sample: + return true + default: + return false + } +} + +func (s *shards) runShard(i int) { + defer s.wg.Done() + queue := s.queues[i] + + // Send batches of at most MaxSamplesPerSend samples to the remote storage. + // If we have fewer samples than that, flush them out after a deadline + // anyways. + pendingSamples := model.Samples{} + + for { + select { + case sample, ok := <-queue: + if !ok { + if len(pendingSamples) > 0 { + log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples)) + s.sendSamples(pendingSamples) + log.Debugf("Done flushing.") + } + return + } + + queueLength.WithLabelValues(s.qm.queueName).Dec() + pendingSamples = append(pendingSamples, sample) + + for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { + s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) + pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] + } + case <-time.After(s.qm.cfg.BatchSendDeadline): + if len(pendingSamples) > 0 { + s.sendSamples(pendingSamples) + pendingSamples = pendingSamples[:0] + } + } + } +} + +func (s *shards) sendSamples(samples model.Samples) { + begin := time.Now() + s.sendSamplesWithBackoff(samples) + + // These counters are used to caclulate the dynamic sharding, and as such + // should be maintained irrespective of success or failure. + s.qm.samplesOut.incr(int64(len(samples))) + s.qm.samplesOutDuration.incr(int64(time.Since(begin))) +} + +// sendSamples to the remote storage with backoff for recoverable errors. +func (s *shards) sendSamplesWithBackoff(samples model.Samples) { + backoff := s.qm.cfg.MinBackoff + for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { + begin := time.Now() + err := s.qm.client.Store(samples) + + sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) + if err == nil { + succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + return + } + + log.Warnf("Error sending %d samples to remote storage: %s", len(samples), err) + if _, ok := err.(recoverableError); !ok { + break + } + time.Sleep(backoff) + backoff = backoff * 2 + if backoff > s.qm.cfg.MaxBackoff { + backoff = s.qm.cfg.MaxBackoff + } + } + + failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) +} diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go new file mode 100644 index 000000000..c97c00714 --- /dev/null +++ b/storage/remote/queue_manager_test.go @@ -0,0 +1,253 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/prometheus/common/model" +) + +type TestStorageClient struct { + receivedSamples map[string]model.Samples + expectedSamples map[string]model.Samples + wg sync.WaitGroup + mtx sync.Mutex +} + +func NewTestStorageClient() *TestStorageClient { + return &TestStorageClient{ + receivedSamples: map[string]model.Samples{}, + expectedSamples: map[string]model.Samples{}, + } +} + +func (c *TestStorageClient) expectSamples(ss model.Samples) { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.expectedSamples[ts] = append(c.expectedSamples[ts], s) + } + c.wg.Add(len(ss)) +} + +func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { + c.wg.Wait() + + c.mtx.Lock() + defer c.mtx.Unlock() + for ts, expectedSamples := range c.expectedSamples { + for i, expected := range expectedSamples { + if !expected.Equal(c.receivedSamples[ts][i]) { + t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i]) + } + } + } +} + +func (c *TestStorageClient) Store(ss model.Samples) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.receivedSamples[ts] = append(c.receivedSamples[ts], s) + } + c.wg.Add(-len(ss)) + return nil +} + +func (c *TestStorageClient) Name() string { + return "teststorageclient" +} + +func TestSampleDelivery(t *testing.T) { + // Let's create an even number of send batches so we don't run into the + // batch timeout case. + n := defaultQueueManagerConfig.QueueCapacity * 2 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples[:len(samples)/2]) + + cfg := defaultQueueManagerConfig + cfg.MaxShards = 1 + m := NewQueueManager(cfg, nil, nil, c) + + // These should be received by the client. + for _, s := range samples[:len(samples)/2] { + m.Append(s) + } + // These will be dropped because the queue is full. + for _, s := range samples[len(samples)/2:] { + m.Append(s) + } + m.Start() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + +func TestSampleDeliveryOrder(t *testing.T) { + ts := 10 + n := defaultQueueManagerConfig.MaxSamplesPerSend * ts + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples) + m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c) + + // These should be received by the client. + for _, s := range samples { + m.Append(s) + } + m.Start() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + +// TestBlockingStorageClient is a queue_manager StorageClient which will block +// on any calls to Store(), until the `block` channel is closed, at which point +// the `numCalls` property will contain a count of how many times Store() was +// called. +type TestBlockingStorageClient struct { + numCalls uint64 + block chan bool +} + +func NewTestBlockedStorageClient() *TestBlockingStorageClient { + return &TestBlockingStorageClient{ + block: make(chan bool), + numCalls: 0, + } +} + +func (c *TestBlockingStorageClient) Store(s model.Samples) error { + atomic.AddUint64(&c.numCalls, 1) + <-c.block + return nil +} + +func (c *TestBlockingStorageClient) NumCalls() uint64 { + return atomic.LoadUint64(&c.numCalls) +} + +func (c *TestBlockingStorageClient) unlock() { + close(c.block) +} + +func (c *TestBlockingStorageClient) Name() string { + return "testblockingstorageclient" +} + +func (t *QueueManager) queueLen() int { + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + queueLength := 0 + for _, shard := range t.shards.queues { + queueLength += len(shard) + } + return queueLength +} + +func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { + // Our goal is to fully empty the queue: + // `MaxSamplesPerSend*Shards` samples should be consumed by the + // per-shard goroutines, and then another `MaxSamplesPerSend` + // should be left on the queue. + n := defaultQueueManagerConfig.MaxSamplesPerSend * 2 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestBlockedStorageClient() + cfg := defaultQueueManagerConfig + cfg.MaxShards = 1 + cfg.QueueCapacity = n + m := NewQueueManager(cfg, nil, nil, c) + + m.Start() + + defer func() { + c.unlock() + m.Stop() + }() + + for _, s := range samples { + m.Append(s) + } + + // Wait until the runShard() loops drain the queue. If things went right, it + // should then immediately block in sendSamples(), but, in case of error, + // it would spawn too many goroutines, and thus we'd see more calls to + // client.Store() + // + // The timed wait is maybe non-ideal, but, in order to verify that we're + // not spawning too many concurrent goroutines, we have to wait on the + // Run() loop to consume a specific number of elements from the + // queue... and it doesn't signal that in any obvious way, except by + // draining the queue. We cap the waiting at 1 second -- that should give + // plenty of time, and keeps the failure fairly quick if we're not draining + // the queue properly. + for i := 0; i < 100 && m.queueLen() > 0; i++ { + time.Sleep(10 * time.Millisecond) + } + + if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend { + t.Fatalf("Failed to drain QueueManager queue, %d elements left", + m.queueLen(), + ) + } + + numCalls := c.NumCalls() + if numCalls != uint64(1) { + t.Errorf("Saw %d concurrent sends, expected 1", numCalls) + } +} diff --git a/storage/remote/read.go b/storage/remote/read.go new file mode 100644 index 000000000..4eb1358d5 --- /dev/null +++ b/storage/remote/read.go @@ -0,0 +1,256 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "sort" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +// Querier returns a new Querier on the storage. +func (r *Storage) Querier(mint, maxt int64) (storage.Querier, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + + queriers := make([]storage.Querier, 0, len(r.clients)) + for _, c := range r.clients { + queriers = append(queriers, &querier{ + mint: mint, + maxt: maxt, + client: c, + externalLabels: r.externalLabels, + }) + } + return storage.NewMergeQuerier(queriers), nil +} + +// Querier is an adapter to make a Client usable as a storage.Querier. +type querier struct { + mint, maxt int64 + client *Client + externalLabels model.LabelSet +} + +// Select returns a set of series that matches the given label matchers. +func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { + m, added := q.addExternalLabels(matchers) + + res, err := q.client.Read(context.TODO(), q.mint, q.maxt, labelMatchersToProto(m)) + if err != nil { + return errSeriesSet{err: err} + } + + series := make([]storage.Series, 0, len(res)) + for _, ts := range res { + labels := labelPairsToLabels(ts.Labels) + removeLabels(labels, added) + series = append(series, &concreteSeries{ + labels: labels, + samples: ts.Samples, + }) + } + sort.Sort(byLabel(series)) + return &concreteSeriesSet{ + series: series, + } +} + +func labelMatchersToProto(matchers []*labels.Matcher) []*LabelMatcher { + pbMatchers := make([]*LabelMatcher, 0, len(matchers)) + for _, m := range matchers { + var mType MatchType + switch m.Type { + case labels.MatchEqual: + mType = MatchType_EQUAL + case labels.MatchNotEqual: + mType = MatchType_NOT_EQUAL + case labels.MatchRegexp: + mType = MatchType_REGEX_MATCH + case labels.MatchNotRegexp: + mType = MatchType_REGEX_NO_MATCH + default: + panic("invalid matcher type") + } + pbMatchers = append(pbMatchers, &LabelMatcher{ + Type: mType, + Name: string(m.Name), + Value: string(m.Value), + }) + } + return pbMatchers +} + +func labelPairsToLabels(labelPairs []*LabelPair) labels.Labels { + result := make(labels.Labels, 0, len(labelPairs)) + for _, l := range labelPairs { + result = append(result, labels.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Sort(result) + return result +} + +type byLabel []storage.Series + +func (a byLabel) Len() int { return len(a) } +func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } + +// LabelValues returns all potential values for a label name. +func (q *querier) LabelValues(name string) ([]string, error) { + // TODO implement? + return nil, nil +} + +// Close releases the resources of the Querier. +func (q *querier) Close() error { + return nil +} + +// errSeriesSet implements storage.SeriesSet, just returning an error. +type errSeriesSet struct { + err error +} + +func (errSeriesSet) Next() bool { + return false +} + +func (errSeriesSet) At() storage.Series { + return nil +} + +func (e errSeriesSet) Err() error { + return e.err +} + +// concreteSeriesSet implements storage.SeriesSet. +type concreteSeriesSet struct { + cur int + series []storage.Series +} + +func (c *concreteSeriesSet) Next() bool { + c.cur++ + return c.cur < len(c.series) +} + +func (c *concreteSeriesSet) At() storage.Series { + return c.series[c.cur] +} + +func (c *concreteSeriesSet) Err() error { + return nil +} + +// concreteSeries implementes storage.Series. +type concreteSeries struct { + labels labels.Labels + samples []*Sample +} + +func (c *concreteSeries) Labels() labels.Labels { + return c.labels +} + +func (c *concreteSeries) Iterator() storage.SeriesIterator { + return &concreteSeriesIterator{ + series: c, + } +} + +// concreteSeriesIterator implements storage.SeriesIterator. +type concreteSeriesIterator struct { + cur int + series *concreteSeries +} + +func (c *concreteSeriesIterator) Seek(t int64) bool { + c.cur = sort.Search(len(c.series.samples), func(n int) bool { + return c.series.samples[c.cur].TimestampMs > t + }) + return c.cur == 0 +} + +func (c *concreteSeriesIterator) At() (t int64, v float64) { + s := c.series.samples[c.cur] + return s.TimestampMs, s.Value +} + +func (c *concreteSeriesIterator) Next() bool { + c.cur++ + return c.cur < len(c.series.samples) +} + +func (c *concreteSeriesIterator) Err() error { + return nil +} + +// addExternalLabels adds matchers for each external label. External labels +// that already have a corresponding user-supplied matcher are skipped, as we +// assume that the user explicitly wants to select a different value for them. +// We return the new set of matchers, along with a map of labels for which +// matchers were added, so that these can later be removed from the result +// time series again. +func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { + el := make(model.LabelSet, len(q.externalLabels)) + for k, v := range q.externalLabels { + el[k] = v + } + for _, m := range matchers { + if _, ok := el[model.LabelName(m.Name)]; ok { + delete(el, model.LabelName(m.Name)) + } + } + for k, v := range el { + m, err := labels.NewMatcher(labels.MatchEqual, string(k), string(v)) + if err != nil { + panic(err) + } + matchers = append(matchers, m) + } + return matchers, el +} + +func removeLabels(l labels.Labels, toDelete model.LabelSet) { + for i := 0; i < len(l); { + if _, ok := toDelete[model.LabelName(l[i].Name)]; ok { + l = l[:i+copy(l[i:], l[i+1:])] + } else { + i++ + } + } +} + +//// MatrixToIterators returns series iterators for a given matrix. +//func MatrixToIterators(m model.Matrix, err error) ([]local.SeriesIterator, error) { +// if err != nil { +// return nil, err +// } +// +// its := make([]local.SeriesIterator, 0, len(m)) +// for _, ss := range m { +// its = append(its, sampleStreamIterator{ +// ss: ss, +// }) +// } +// return its, nil +//} diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go new file mode 100644 index 000000000..596ced561 --- /dev/null +++ b/storage/remote/read_test.go @@ -0,0 +1,94 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "reflect" + "sort" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" +) + +func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher { + m, err := labels.NewMatcher(mt, name, val) + if err != nil { + panic(err) + } + return m +} + +func TestAddExternalLabels(t *testing.T) { + tests := []struct { + el model.LabelSet + inMatchers []*labels.Matcher + outMatchers []*labels.Matcher + added model.LabelSet + }{ + { + el: model.LabelSet{}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + added: model.LabelSet{}, + }, + { + el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"), + }, + added: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + }, + { + el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), + }, + added: model.LabelSet{"region": "europe"}, + }, + } + + for i, test := range tests { + q := querier{ + externalLabels: test.el, + } + + matchers, added := q.addExternalLabels(test.inMatchers) + + sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) + sort.Slice(matchers, func(i, j int) bool { return matchers[i].Name < matchers[j].Name }) + + if !reflect.DeepEqual(matchers, test.outMatchers) { + t.Fatalf("%d. unexpected matchers; want %v, got %v", i, test.outMatchers, matchers) + } + if !reflect.DeepEqual(added, test.added) { + t.Fatalf("%d. unexpected added labels; want %v, got %v", i, test.added, added) + } + } +} diff --git a/storage/remote/remote.pb.go b/storage/remote/remote.pb.go new file mode 100644 index 000000000..e7cbc0d15 --- /dev/null +++ b/storage/remote/remote.pb.go @@ -0,0 +1,312 @@ +// Code generated by protoc-gen-go. +// source: remote.proto +// DO NOT EDIT! + +/* +Package remote is a generated protocol buffer package. + +It is generated from these files: + remote.proto + +It has these top-level messages: + Sample + LabelPair + TimeSeries + WriteRequest + ReadRequest + ReadResponse + Query + LabelMatcher + QueryResult +*/ +package remote + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type MatchType int32 + +const ( + MatchType_EQUAL MatchType = 0 + MatchType_NOT_EQUAL MatchType = 1 + MatchType_REGEX_MATCH MatchType = 2 + MatchType_REGEX_NO_MATCH MatchType = 3 +) + +var MatchType_name = map[int32]string{ + 0: "EQUAL", + 1: "NOT_EQUAL", + 2: "REGEX_MATCH", + 3: "REGEX_NO_MATCH", +} +var MatchType_value = map[string]int32{ + "EQUAL": 0, + "NOT_EQUAL": 1, + "REGEX_MATCH": 2, + "REGEX_NO_MATCH": 3, +} + +func (x MatchType) String() string { + return proto.EnumName(MatchType_name, int32(x)) +} +func (MatchType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type Sample struct { + Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"` +} + +func (m *Sample) Reset() { *m = Sample{} } +func (m *Sample) String() string { return proto.CompactTextString(m) } +func (*Sample) ProtoMessage() {} +func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Sample) GetValue() float64 { + if m != nil { + return m.Value + } + return 0 +} + +func (m *Sample) GetTimestampMs() int64 { + if m != nil { + return m.TimestampMs + } + return 0 +} + +type LabelPair struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` +} + +func (m *LabelPair) Reset() { *m = LabelPair{} } +func (m *LabelPair) String() string { return proto.CompactTextString(m) } +func (*LabelPair) ProtoMessage() {} +func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *LabelPair) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *LabelPair) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +type TimeSeries struct { + Labels []*LabelPair `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"` + // Sorted by time, oldest sample first. + Samples []*Sample `protobuf:"bytes,2,rep,name=samples" json:"samples,omitempty"` +} + +func (m *TimeSeries) Reset() { *m = TimeSeries{} } +func (m *TimeSeries) String() string { return proto.CompactTextString(m) } +func (*TimeSeries) ProtoMessage() {} +func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *TimeSeries) GetLabels() []*LabelPair { + if m != nil { + return m.Labels + } + return nil +} + +func (m *TimeSeries) GetSamples() []*Sample { + if m != nil { + return m.Samples + } + return nil +} + +type WriteRequest struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *WriteRequest) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +type ReadRequest struct { + Queries []*Query `protobuf:"bytes,1,rep,name=queries" json:"queries,omitempty"` +} + +func (m *ReadRequest) Reset() { *m = ReadRequest{} } +func (m *ReadRequest) String() string { return proto.CompactTextString(m) } +func (*ReadRequest) ProtoMessage() {} +func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *ReadRequest) GetQueries() []*Query { + if m != nil { + return m.Queries + } + return nil +} + +type ReadResponse struct { + // In same order as the request's queries. + Results []*QueryResult `protobuf:"bytes,1,rep,name=results" json:"results,omitempty"` +} + +func (m *ReadResponse) Reset() { *m = ReadResponse{} } +func (m *ReadResponse) String() string { return proto.CompactTextString(m) } +func (*ReadResponse) ProtoMessage() {} +func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *ReadResponse) GetResults() []*QueryResult { + if m != nil { + return m.Results + } + return nil +} + +type Query struct { + StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs" json:"start_timestamp_ms,omitempty"` + EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs" json:"end_timestamp_ms,omitempty"` + Matchers []*LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers,omitempty"` +} + +func (m *Query) Reset() { *m = Query{} } +func (m *Query) String() string { return proto.CompactTextString(m) } +func (*Query) ProtoMessage() {} +func (*Query) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func (m *Query) GetStartTimestampMs() int64 { + if m != nil { + return m.StartTimestampMs + } + return 0 +} + +func (m *Query) GetEndTimestampMs() int64 { + if m != nil { + return m.EndTimestampMs + } + return 0 +} + +func (m *Query) GetMatchers() []*LabelMatcher { + if m != nil { + return m.Matchers + } + return nil +} + +type LabelMatcher struct { + Type MatchType `protobuf:"varint,1,opt,name=type,enum=remote.MatchType" json:"type,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"` + Value string `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` +} + +func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } +func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } +func (*LabelMatcher) ProtoMessage() {} +func (*LabelMatcher) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } + +func (m *LabelMatcher) GetType() MatchType { + if m != nil { + return m.Type + } + return MatchType_EQUAL +} + +func (m *LabelMatcher) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *LabelMatcher) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +type QueryResult struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *QueryResult) Reset() { *m = QueryResult{} } +func (m *QueryResult) String() string { return proto.CompactTextString(m) } +func (*QueryResult) ProtoMessage() {} +func (*QueryResult) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +func (m *QueryResult) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +func init() { + proto.RegisterType((*Sample)(nil), "remote.Sample") + proto.RegisterType((*LabelPair)(nil), "remote.LabelPair") + proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries") + proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest") + proto.RegisterType((*ReadRequest)(nil), "remote.ReadRequest") + proto.RegisterType((*ReadResponse)(nil), "remote.ReadResponse") + proto.RegisterType((*Query)(nil), "remote.Query") + proto.RegisterType((*LabelMatcher)(nil), "remote.LabelMatcher") + proto.RegisterType((*QueryResult)(nil), "remote.QueryResult") + proto.RegisterEnum("remote.MatchType", MatchType_name, MatchType_value) +} + +func init() { proto.RegisterFile("remote.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 424 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xc1, 0x6e, 0xd3, 0x40, + 0x10, 0x65, 0xe3, 0x26, 0xc1, 0x63, 0x37, 0x84, 0xa1, 0x87, 0x1c, 0xc3, 0x4a, 0x08, 0x83, 0xa0, + 0x42, 0x45, 0x70, 0xe3, 0x10, 0x50, 0x04, 0x42, 0x4d, 0x4b, 0xb7, 0x46, 0x70, 0xb3, 0xb6, 0x64, + 0x24, 0x2c, 0x79, 0x13, 0x77, 0x77, 0x8d, 0x94, 0xcf, 0xe0, 0x8f, 0x51, 0x76, 0xb3, 0x8e, 0x23, + 0xe5, 0xc4, 0x2d, 0x33, 0xef, 0xbd, 0x99, 0x97, 0x7d, 0x63, 0x48, 0x35, 0xa9, 0xb5, 0xa5, 0xf3, + 0x5a, 0xaf, 0xed, 0x1a, 0x07, 0xbe, 0xe2, 0x33, 0x18, 0xdc, 0x4a, 0x55, 0x57, 0x84, 0x67, 0xd0, + 0xff, 0x23, 0xab, 0x86, 0x26, 0x6c, 0xca, 0x32, 0x26, 0x7c, 0x81, 0x4f, 0x21, 0xb5, 0xa5, 0x22, + 0x63, 0xa5, 0xaa, 0x0b, 0x65, 0x26, 0xbd, 0x29, 0xcb, 0x22, 0x91, 0xb4, 0xbd, 0x85, 0xe1, 0xef, + 0x20, 0xbe, 0x94, 0x77, 0x54, 0x7d, 0x93, 0xa5, 0x46, 0x84, 0x93, 0x95, 0x54, 0x7e, 0x48, 0x2c, + 0xdc, 0xef, 0xfd, 0xe4, 0x9e, 0x6b, 0xfa, 0x82, 0x4b, 0x80, 0xbc, 0x54, 0x74, 0x4b, 0xba, 0x24, + 0x83, 0x2f, 0x60, 0x50, 0x6d, 0x87, 0x98, 0x09, 0x9b, 0x46, 0x59, 0x72, 0xf1, 0xf8, 0x7c, 0x67, + 0xb7, 0x1d, 0x2d, 0x76, 0x04, 0xcc, 0x60, 0x68, 0x9c, 0xe5, 0xad, 0x9b, 0x2d, 0x77, 0x14, 0xb8, + 0xfe, 0x9f, 0x88, 0x00, 0xf3, 0x8f, 0x90, 0xfe, 0xd0, 0xa5, 0x25, 0x41, 0xf7, 0x0d, 0x19, 0x8b, + 0x17, 0x00, 0xce, 0xb8, 0x5b, 0xb9, 0x5b, 0x84, 0x41, 0xbc, 0x37, 0x23, 0x3a, 0x2c, 0xfe, 0x1e, + 0x12, 0x41, 0x72, 0x19, 0x46, 0x3c, 0x87, 0xe1, 0x7d, 0xd3, 0xd5, 0x9f, 0x06, 0xfd, 0x4d, 0x43, + 0x7a, 0x23, 0x02, 0xca, 0x3f, 0x40, 0xea, 0x75, 0xa6, 0x5e, 0xaf, 0x0c, 0xe1, 0x6b, 0x18, 0x6a, + 0x32, 0x4d, 0x65, 0x83, 0xf0, 0xc9, 0xa1, 0xd0, 0x61, 0x22, 0x70, 0xf8, 0x5f, 0x06, 0x7d, 0x07, + 0xe0, 0x2b, 0x40, 0x63, 0xa5, 0xb6, 0xc5, 0x41, 0x0e, 0xcc, 0xe5, 0x30, 0x76, 0x48, 0xbe, 0x0f, + 0x03, 0x33, 0x18, 0xd3, 0x6a, 0x59, 0x1c, 0xc9, 0x6c, 0x44, 0xab, 0x65, 0x97, 0xf9, 0x06, 0x1e, + 0x2a, 0x69, 0x7f, 0xfd, 0x26, 0x6d, 0x26, 0x91, 0x73, 0x74, 0x76, 0xf0, 0xe6, 0x0b, 0x0f, 0x8a, + 0x96, 0xc5, 0x0b, 0x48, 0xbb, 0x08, 0x3e, 0x83, 0x13, 0xbb, 0xa9, 0x7d, 0xd6, 0xa3, 0x7d, 0x62, + 0x0e, 0xce, 0x37, 0x35, 0x09, 0x07, 0xb7, 0x27, 0xd1, 0x3b, 0x76, 0x12, 0x51, 0xf7, 0x24, 0x66, + 0x90, 0x74, 0x1e, 0xe3, 0x7f, 0xe2, 0x7a, 0xf9, 0x15, 0xe2, 0x76, 0x3f, 0xc6, 0xd0, 0x9f, 0xdf, + 0x7c, 0x9f, 0x5d, 0x8e, 0x1f, 0xe0, 0x29, 0xc4, 0x57, 0xd7, 0x79, 0xe1, 0x4b, 0x86, 0x8f, 0x20, + 0x11, 0xf3, 0xcf, 0xf3, 0x9f, 0xc5, 0x62, 0x96, 0x7f, 0xfa, 0x32, 0xee, 0x21, 0xc2, 0xc8, 0x37, + 0xae, 0xae, 0x77, 0xbd, 0xe8, 0x6e, 0xe0, 0x3e, 0x95, 0xb7, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, + 0x9b, 0x9e, 0x76, 0xb3, 0x3a, 0x03, 0x00, 0x00, +} diff --git a/storage/remote/remote.proto b/storage/remote/remote.proto new file mode 100644 index 000000000..6f09c9efc --- /dev/null +++ b/storage/remote/remote.proto @@ -0,0 +1,68 @@ +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package remote; + +message Sample { + double value = 1; + int64 timestamp_ms = 2; +} + +message LabelPair { + string name = 1; + string value = 2; +} + +message TimeSeries { + repeated LabelPair labels = 1; + // Sorted by time, oldest sample first. + repeated Sample samples = 2; +} + +message WriteRequest { + repeated TimeSeries timeseries = 1; +} + +message ReadRequest { + repeated Query queries = 1; +} + +message ReadResponse { + // In same order as the request's queries. + repeated QueryResult results = 1; +} + +message Query { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + repeated LabelMatcher matchers = 3; +} + +enum MatchType { + EQUAL = 0; + NOT_EQUAL = 1; + REGEX_MATCH = 2; + REGEX_NO_MATCH = 3; +} + +message LabelMatcher { + MatchType type = 1; + string name = 2; + string value = 3; +} + +message QueryResult { + repeated TimeSeries timeseries = 1; +} diff --git a/storage/remote/storage.go b/storage/remote/storage.go new file mode 100644 index 000000000..d0449995d --- /dev/null +++ b/storage/remote/storage.go @@ -0,0 +1,101 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package remote + +package remote + +import ( + "sync" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +type Storage struct { + mtx sync.RWMutex + + // For writes + queues []*QueueManager + + // For reads + clients []*Client + externalLabels model.LabelSet +} + +// ApplyConfig updates the state as the new config requires. +func (s *Storage) ApplyConfig(conf *config.Config) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + // Update write queues + + newQueues := []*QueueManager{} + // TODO: we should only stop & recreate queues which have changes, + // as this can be quite disruptive. + for i, rwConf := range conf.RemoteWriteConfigs { + c, err := NewClient(i, &clientConfig{ + url: rwConf.URL, + timeout: rwConf.RemoteTimeout, + httpClientConfig: rwConf.HTTPClientConfig, + }) + if err != nil { + return err + } + newQueues = append(newQueues, NewQueueManager( + defaultQueueManagerConfig, + conf.GlobalConfig.ExternalLabels, + rwConf.WriteRelabelConfigs, + c, + )) + } + + for _, q := range s.queues { + q.Stop() + } + + s.queues = newQueues + for _, q := range s.queues { + q.Start() + } + + // Update read clients + + clients := []*Client{} + for i, rrConf := range conf.RemoteReadConfigs { + c, err := NewClient(i, &clientConfig{ + url: rrConf.URL, + timeout: rrConf.RemoteTimeout, + httpClientConfig: rrConf.HTTPClientConfig, + }) + if err != nil { + return err + } + clients = append(clients, c) + } + + s.clients = clients + s.externalLabels = conf.GlobalConfig.ExternalLabels + + return nil +} + +// Stop the background processing of the storage queues. +func (s *Storage) Close() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, q := range s.queues { + q.Stop() + } + + return nil +} diff --git a/storage/remote/write.go b/storage/remote/write.go new file mode 100644 index 000000000..f69561bc7 --- /dev/null +++ b/storage/remote/write.go @@ -0,0 +1,57 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +func (s *Storage) Appender() (storage.Appender, error) { + return s, nil +} + +func (s *Storage) Add(l labels.Labels, t int64, v float64) (string, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + for _, q := range s.queues { + q.Append(&model.Sample{ + Metric: labelsToMetric(l), + Timestamp: model.Time(t), + Value: model.SampleValue(v), + }) + } + return "", nil +} + +func labelsToMetric(ls labels.Labels) model.Metric { + metric := make(model.Metric, len(ls)) + for _, l := range ls { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} + +func (*Storage) AddFast(ref string, t int64, v float64) error { + return storage.ErrNotFound +} + +func (*Storage) Commit() error { + return nil +} + +func (*Storage) Rollback() error { + return nil +} diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 000000000..eabcd1147 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,380 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// contextContext is a temporary(?) copy of the context.Context type +// to support both Go 1.6 using golang.org/x/net/context and Go 1.7+ +// with the built-in context package. If people ever stop using Go 1.6 +// we can remove this. +type contextContext interface { + Deadline() (deadline time.Time, ok bool) + Done() <-chan struct{} + Err() error + Value(key interface{}) interface{} +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) wait(ctx contextContext) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) waitN(ctx contextContext, n int) (err error) { + if n > lim.burst && lim.limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait + t := time.NewTimer(r.DelayFrom(now)) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + return d.Seconds() * float64(limit) +} diff --git a/vendor/golang.org/x/time/rate/rate_go16.go b/vendor/golang.org/x/time/rate/rate_go16.go new file mode 100644 index 000000000..6bab1850f --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go16.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.7 + +package rate + +import "golang.org/x/net/context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/golang.org/x/time/rate/rate_go17.go b/vendor/golang.org/x/time/rate/rate_go17.go new file mode 100644 index 000000000..f90d85f51 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go17.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +package rate + +import "context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index ce1255a59..0a304bb89 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1097,6 +1097,12 @@ "revision": "c589d0c9f0d81640c518354c7bcae77d99820aa3", "revisionTime": "2016-09-30T00:14:02Z" }, + { + "checksumSHA1": "vGfePfr0+weQUeTM/71mu+LCFuE=", + "path": "golang.org/x/time/rate", + "revision": "8be79e1e0910c292df4e79c241bb7e8f7e725959", + "revisionTime": "2017-04-24T23:28:54Z" + }, { "checksumSHA1": "AjdmRXf0fiy6Bec9mNlsGsmZi1k=", "path": "google.golang.org/api/compute/v1",