Merge pull request #2499 from prometheus/remote-read

Remote Read
This commit is contained in:
Julius Volz 2017-03-27 14:43:44 +02:00 committed by GitHub
commit b5b0e00923
16 changed files with 1549 additions and 128 deletions

View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/fanin"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web"
@ -92,14 +93,20 @@ func Main() int {
return 1 return 1
} }
remoteStorage := &remote.Storage{} remoteAppender := &remote.Writer{}
sampleAppender = append(sampleAppender, remoteStorage) sampleAppender = append(sampleAppender, remoteAppender)
reloadables = append(reloadables, remoteStorage) remoteReader := &remote.Reader{}
reloadables = append(reloadables, remoteAppender, remoteReader)
queryable := fanin.Queryable{
Local: localStorage,
Remote: remoteReader,
}
var ( var (
notifier = notifier.New(&cfg.notifier) notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender) targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) queryEngine = promql.NewEngine(queryable, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background()) ctx, cancelCtx = context.WithCancel(context.Background())
) )
@ -107,7 +114,7 @@ func Main() int {
SampleAppender: sampleAppender, SampleAppender: sampleAppender,
Notifier: notifier, Notifier: notifier,
QueryEngine: queryEngine, QueryEngine: queryEngine,
Context: ctx, Context: fanin.WithLocalOnly(ctx),
ExternalURL: cfg.web.ExternalURL, ExternalURL: cfg.web.ExternalURL,
}) })
@ -178,7 +185,7 @@ func Main() int {
} }
}() }()
defer remoteStorage.Stop() defer remoteAppender.Stop()
// The storage has to be fully initialized before registering. // The storage has to be fully initialized before registering.
if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok { if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok {

View file

@ -167,6 +167,11 @@ var (
DefaultRemoteWriteConfig = RemoteWriteConfig{ DefaultRemoteWriteConfig = RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second), RemoteTimeout: model.Duration(30 * time.Second),
} }
// DefaultRemoteReadConfig is the default remote read configuration.
DefaultRemoteReadConfig = RemoteReadConfig{
RemoteTimeout: model.Duration(1 * time.Minute),
}
) )
// URL is a custom URL type that allows validation at configuration load time. // URL is a custom URL type that allows validation at configuration load time.
@ -205,6 +210,7 @@ type Config struct {
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"` ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"` RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
// Catches all undefined fields and must be empty after parsing. // Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"` XXX map[string]interface{} `yaml:",inline"`
@ -1296,15 +1302,16 @@ func (re Regexp) MarshalYAML() (interface{}, error) {
return nil, nil return nil, nil
} }
// RemoteWriteConfig is the configuration for remote storage. // RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct { type RemoteWriteConfig struct {
URL *URL `yaml:"url,omitempty"` URL *URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
ProxyURL URL `yaml:"proxy_url,omitempty"`
WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"` WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
// Catches all undefined fields and must be empty after parsing. // Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"` XXX map[string]interface{} `yaml:",inline"`
} }
@ -1321,3 +1328,29 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
} }
return nil return nil
} }
// RemoteReadConfig is the configuration for reading from remote storage.
type RemoteReadConfig struct {
URL *URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig HTTPClientConfig `yaml:",inline"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *RemoteReadConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultRemoteReadConfig
type plain RemoteReadConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
if err := checkOverflow(c.XXX, "remote_read"); err != nil {
return err
}
return nil
}

View file

@ -24,10 +24,9 @@ import (
"time" "time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/util/httputil" "golang.org/x/net/context/ctxhttp"
) )
const ( const (
@ -38,14 +37,14 @@ const (
// Client allows sending batches of Prometheus samples to OpenTSDB. // Client allows sending batches of Prometheus samples to OpenTSDB.
type Client struct { type Client struct {
url string url string
httpClient *http.Client timeout time.Duration
} }
// NewClient creates a new Client. // NewClient creates a new Client.
func NewClient(url string, timeout time.Duration) *Client { func NewClient(url string, timeout time.Duration) *Client {
return &Client{ return &Client{
url: url, url: url,
httpClient: httputil.NewDeadlineClient(timeout, nil), timeout: timeout,
} }
} }
@ -100,11 +99,10 @@ func (c *Client) Store(samples model.Samples) error {
return err return err
} }
resp, err := c.httpClient.Post( ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
u.String(), defer cancel()
contentTypeJSON,
bytes.NewBuffer(buf), resp, err := ctxhttp.Post(ctx, http.DefaultClient, u.String(), contentTypeJSON, bytes.NewBuffer(buf))
)
if err != nil { if err != nil {
return err return err
} }

View file

@ -35,7 +35,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/relabel"
"github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/util/httputil"
) )
const ( const (
@ -435,7 +435,7 @@ type alertmanagerSet struct {
} }
func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error) { func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error) {
client, err := retrieval.NewHTTPClient(cfg.HTTPClientConfig) client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/util/httputil"
) )
const ( const (
@ -114,7 +115,7 @@ type scrapePool struct {
} }
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
client, err := NewHTTPClient(cfg.HTTPClientConfig) client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil { if err != nil {
// Any errors that could occur here should be caught during config validation. // Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
@ -161,7 +162,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
sp.mtx.Lock() sp.mtx.Lock()
defer sp.mtx.Unlock() defer sp.mtx.Unlock()
client, err := NewHTTPClient(cfg.HTTPClientConfig) client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil { if err != nil {
// Any errors that could occur here should be caught during config validation. // Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)

View file

@ -16,9 +16,7 @@ package retrieval
import ( import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"io/ioutil"
"net" "net"
"net/http"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
@ -29,7 +27,6 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/relabel"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil"
) )
// TargetHealth describes the health state of a target. // TargetHealth describes the health state of a target.
@ -67,43 +64,6 @@ func NewTarget(labels, discoveredLabels model.LabelSet, params url.Values) *Targ
} }
} }
// NewHTTPClient returns a new HTTP client configured for the given scrape configuration.
func NewHTTPClient(cfg config.HTTPClientConfig) (*http.Client, error) {
tlsConfig, err := httputil.NewTLSConfig(cfg.TLSConfig)
if err != nil {
return nil, err
}
// The only timeout we care about is the configured scrape timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(cfg.ProxyURL.URL),
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
}
// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
bearerToken := cfg.BearerToken
if len(bearerToken) == 0 && len(cfg.BearerTokenFile) > 0 {
b, err := ioutil.ReadFile(cfg.BearerTokenFile)
if err != nil {
return nil, fmt.Errorf("unable to read bearer token file %s: %s", cfg.BearerTokenFile, err)
}
bearerToken = strings.TrimSpace(string(b))
}
if len(bearerToken) > 0 {
rt = httputil.NewBearerAuthRoundTripper(bearerToken, rt)
}
if cfg.BasicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, rt)
}
// Return a new client with the configured round tripper.
return httputil.NewClient(rt), nil
}
func (t *Target) String() string { func (t *Target) String() string {
return t.URL().String() return t.URL().String()
} }

View file

@ -29,6 +29,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
) )
const ( const (
@ -154,7 +155,7 @@ func TestNewHTTPBearerToken(t *testing.T) {
cfg := config.HTTPClientConfig{ cfg := config.HTTPClientConfig{
BearerToken: "1234", BearerToken: "1234",
} }
c, err := NewHTTPClient(cfg) c, err := httputil.NewClientFromConfig(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -181,7 +182,7 @@ func TestNewHTTPBearerTokenFile(t *testing.T) {
cfg := config.HTTPClientConfig{ cfg := config.HTTPClientConfig{
BearerTokenFile: "testdata/bearertoken.txt", BearerTokenFile: "testdata/bearertoken.txt",
} }
c, err := NewHTTPClient(cfg) c, err := httputil.NewClientFromConfig(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -210,7 +211,7 @@ func TestNewHTTPBasicAuth(t *testing.T) {
Password: "password123", Password: "password123",
}, },
} }
c, err := NewHTTPClient(cfg) c, err := httputil.NewClientFromConfig(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -238,7 +239,7 @@ func TestNewHTTPCACert(t *testing.T) {
CAFile: caCertPath, CAFile: caCertPath,
}, },
} }
c, err := NewHTTPClient(cfg) c, err := httputil.NewClientFromConfig(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -272,7 +273,7 @@ func TestNewHTTPClientCert(t *testing.T) {
KeyFile: "testdata/client.key", KeyFile: "testdata/client.key",
}, },
} }
c, err := NewHTTPClient(cfg) c, err := httputil.NewClientFromConfig(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -301,7 +302,7 @@ func TestNewHTTPWithServerName(t *testing.T) {
ServerName: "prometheus.rocks", ServerName: "prometheus.rocks",
}, },
} }
c, err := NewHTTPClient(cfg) c, err := httputil.NewClientFromConfig(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -330,7 +331,7 @@ func TestNewHTTPWithBadServerName(t *testing.T) {
ServerName: "badname", ServerName: "badname",
}, },
} }
c, err := NewHTTPClient(cfg) c, err := httputil.NewClientFromConfig(cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -369,7 +370,7 @@ func TestNewClientWithBadTLSConfig(t *testing.T) {
KeyFile: "testdata/nonexistent_client.key", KeyFile: "testdata/nonexistent_client.key",
}, },
} }
_, err := NewHTTPClient(cfg) _, err := httputil.NewClientFromConfig(cfg)
if err == nil { if err == nil {
t.Fatalf("Expected error, got nil.") t.Fatalf("Expected error, got nil.")
} }

243
storage/fanin/fanin.go Normal file
View file

@ -0,0 +1,243 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fanin
import (
"sort"
"time"
"golang.org/x/net/context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/remote"
)
type contextKey string
const ctxLocalOnly contextKey = "local-only"
// WithLocalOnly decorates a context to indicate that a query should
// only be executed against local data.
func WithLocalOnly(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxLocalOnly, struct{}{})
}
func localOnly(ctx context.Context) bool {
return ctx.Value(ctxLocalOnly) == struct{}{}
}
// Queryable is a local.Queryable that reads from local and remote storage.
type Queryable struct {
Local promql.Queryable
Remote *remote.Reader
}
// Querier implements local.Queryable.
func (q Queryable) Querier() (local.Querier, error) {
localQuerier, err := q.Local.Querier()
if err != nil {
return nil, err
}
fq := querier{
local: localQuerier,
remotes: q.Remote.Queriers(),
}
return fq, nil
}
type querier struct {
local local.Querier
remotes []local.Querier
}
func (q querier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) {
return q.QueryRange(ctx, from, through, matchers...)
})
}
func (q querier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) {
return q.QueryInstant(ctx, ts, stalenessDelta, matchers...)
})
}
func (q querier) query(ctx context.Context, qFn func(q local.Querier) ([]local.SeriesIterator, error)) ([]local.SeriesIterator, error) {
localIts, err := qFn(q.local)
if err != nil {
return nil, err
}
if len(q.remotes) == 0 || localOnly(ctx) {
return localIts, nil
}
fpToIt := map[model.Fingerprint]*mergeIterator{}
for _, it := range localIts {
fp := it.Metric().Metric.Fingerprint()
fpToIt[fp] = &mergeIterator{local: it}
}
for _, q := range q.remotes {
its, err := qFn(q)
if err != nil {
return nil, err
}
mergeIterators(fpToIt, its)
}
its := make([]local.SeriesIterator, 0, len(fpToIt))
for _, it := range fpToIt {
its = append(its, it)
}
return its, nil
}
func (q querier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
return q.local.MetricsForLabelMatchers(ctx, from, through, matcherSets...)
}
func (q querier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
return q.local.LastSampleForLabelMatchers(ctx, cutoff, matcherSets...)
}
func (q querier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
return q.local.LabelValuesForLabelName(ctx, ln)
}
func (q querier) Close() error {
if q.local != nil {
if err := q.local.Close(); err != nil {
return err
}
}
for _, q := range q.remotes {
if err := q.Close(); err != nil {
return err
}
}
return nil
}
// mergeIterator is a SeriesIterator which merges query results for local and remote
// SeriesIterators. If a series has samples in a local iterator, remote samples are
// only considered before the first local sample of a series. This is to avoid things
// like downsampling on the side of the remote storage to interfere with rate(),
// irate(), etc.
type mergeIterator struct {
local local.SeriesIterator
remote []local.SeriesIterator
}
func (mit mergeIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
latest := model.ZeroSamplePair
if mit.local != nil {
latest = mit.local.ValueAtOrBeforeTime(t)
}
// We only need to look for a remote last sample if we don't have a local one
// at all. If we have a local one, by definition we would not care about earlier
// "last" samples, and we would not consider later ones as well, because we
// generally only consider remote samples that are older than the oldest
// local sample.
if latest == model.ZeroSamplePair {
for _, it := range mit.remote {
v := it.ValueAtOrBeforeTime(t)
if v.Timestamp.After(latest.Timestamp) {
latest = v
}
}
}
return latest
}
func (mit mergeIterator) RangeValues(interval metric.Interval) []model.SamplePair {
remoteCutoff := model.Latest
var values []model.SamplePair
if mit.local != nil {
values = mit.local.RangeValues(interval)
if len(values) > 0 {
remoteCutoff = values[0].Timestamp
}
}
for _, it := range mit.remote {
vs := it.RangeValues(interval)
n := sort.Search(len(vs), func(i int) bool {
return !vs[i].Timestamp.Before(remoteCutoff)
})
values = mergeSamples(values, vs[:n])
}
return values
}
func (mit mergeIterator) Metric() metric.Metric {
if mit.local != nil {
return mit.local.Metric()
}
// If there is no local iterator, there has to be at least one remote one in
// order for this iterator to have been created.
return mit.remote[0].Metric()
}
func (mit mergeIterator) Close() {
if mit.local != nil {
mit.local.Close()
}
for _, it := range mit.remote {
it.Close()
}
}
func mergeIterators(fpToIt map[model.Fingerprint]*mergeIterator, its []local.SeriesIterator) {
for _, it := range its {
fp := it.Metric().Metric.Fingerprint()
if fpIts, ok := fpToIt[fp]; !ok {
fpToIt[fp] = &mergeIterator{remote: []local.SeriesIterator{it}}
} else {
fpToIt[fp].remote = append(fpIts.remote, it)
}
}
}
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
result := make([]model.SamplePair, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i])
i++
} else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}

720
storage/fanin/fanin_test.go Normal file
View file

@ -0,0 +1,720 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fanin
import (
"reflect"
"sort"
"testing"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/remote"
)
type testQuerier struct {
series model.Matrix
}
func (q testQuerier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
var outMatrix model.Matrix
for _, s := range q.series {
for _, m := range matchers {
if !m.Match(s.Metric[m.Name]) {
continue
}
}
fromIdx := sort.Search(len(s.Values), func(i int) bool {
return !s.Values[i].Timestamp.Before(from)
})
throughIdx := sort.Search(len(s.Values), func(i int) bool {
return s.Values[i].Timestamp.After(through)
})
outMatrix = append(outMatrix, &model.SampleStream{
Metric: s.Metric,
Values: s.Values[fromIdx:throughIdx],
})
}
return remote.MatrixToIterators(outMatrix, nil)
}
func (q testQuerier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return q.QueryRange(ctx, ts.Add(-stalenessDelta), ts, matchers...)
}
func (q testQuerier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
fpToMetric := map[model.Fingerprint]model.Metric{}
for _, s := range q.series {
matched := false
for _, matchers := range matcherSets {
for _, m := range matchers {
if !m.Match(s.Metric[m.Name]) {
continue
}
}
matched = true
}
fromIdx := sort.Search(len(s.Values), func(i int) bool {
return !s.Values[i].Timestamp.Before(from)
})
throughIdx := sort.Search(len(s.Values), func(i int) bool {
return s.Values[i].Timestamp.After(through)
})
if fromIdx == throughIdx {
continue
}
if matched {
fpToMetric[s.Metric.Fingerprint()] = s.Metric
}
}
metrics := make([]metric.Metric, 0, len(fpToMetric))
for _, m := range fpToMetric {
metrics = append(metrics, metric.Metric{Metric: m})
}
return metrics, nil
}
func (q testQuerier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
panic("not implemented")
}
func (q testQuerier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
panic("not implemented")
}
func (q testQuerier) Close() error {
return nil
}
func TestQueryRange(t *testing.T) {
type query struct {
from model.Time
through model.Time
out model.Matrix
localOnly bool
}
tests := []struct {
name string
local model.Matrix
remote []model.Matrix
queries []query
}{
{
name: "duplicate samples are eliminated",
local: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 1,
Value: 1,
},
{
Timestamp: 2,
Value: 2,
},
},
},
},
remote: []model.Matrix{
model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 1,
},
{
Timestamp: 2,
Value: 2,
},
},
},
},
model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 1,
},
{
Timestamp: 2,
Value: 2,
},
},
},
},
},
queries: []query{
{
from: 0,
through: 1,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 1,
},
},
},
},
},
},
},
{
name: "remote data is thrown away after first local sample",
local: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 2,
Value: 2,
},
{
Timestamp: 4,
Value: 4,
},
},
},
},
remote: []model.Matrix{
model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 2,
Value: 20,
},
{
Timestamp: 4,
Value: 40,
},
},
},
},
model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 1,
Value: 10,
},
{
Timestamp: 3,
Value: 30,
},
},
},
},
},
queries: []query{
{
from: 0,
through: 4,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 10,
},
{
Timestamp: 2,
Value: 2,
},
{
Timestamp: 4,
Value: 4,
},
},
},
},
},
{
from: 2,
through: 2,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 2,
Value: 2,
},
},
},
},
},
},
},
{
name: "no local data",
remote: []model.Matrix{
model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 2,
Value: 20,
},
{
Timestamp: 4,
Value: 40,
},
},
},
},
model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 1,
Value: 10,
},
{
Timestamp: 3,
Value: 30,
},
},
},
},
},
queries: []query{
{
from: 0,
through: 4,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 10,
},
{
Timestamp: 2,
Value: 20,
},
{
Timestamp: 3,
Value: 30,
},
{
Timestamp: 4,
Value: 40,
},
},
},
},
},
{
from: 2,
through: 2,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 2,
Value: 20,
},
},
},
},
},
},
},
{
name: "only local data",
local: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 1,
},
{
Timestamp: 2,
Value: 2,
},
},
},
},
queries: []query{
{
from: 0,
through: 4,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 1,
},
{
Timestamp: 2,
Value: 2,
},
},
},
},
},
{
from: 3,
through: 3,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 2,
Value: 2,
},
},
},
},
},
},
},
{
name: "context value to indicate only local querying is set",
local: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 2,
Value: 2,
},
{
Timestamp: 3,
Value: 3,
},
},
},
},
remote: []model.Matrix{
model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 0,
Value: 0,
},
{
Timestamp: 1,
Value: 1,
},
{
Timestamp: 2,
Value: 2,
},
},
},
},
},
queries: []query{
{
from: 0,
through: 3,
localOnly: true,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 2,
Value: 2,
},
{
Timestamp: 3,
Value: 3,
},
},
},
},
},
{
from: 1,
through: 1,
localOnly: true,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{model.ZeroSamplePair},
},
},
},
{
from: 2,
through: 2,
localOnly: true,
out: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
},
Values: []model.SamplePair{
{
Timestamp: 2,
Value: 2,
},
},
},
},
},
},
},
}
matcher, err := metric.NewLabelMatcher(metric.Equal, model.MetricNameLabel, "testmetric")
if err != nil {
t.Fatal(err)
}
for _, test := range tests {
q := querier{
local: &testQuerier{test.local},
}
for _, m := range test.remote {
q.remotes = append(q.remotes, &testQuerier{m})
}
for i, query := range test.queries {
ctx := context.Background()
if query.localOnly {
ctx = WithLocalOnly(ctx)
}
var its []local.SeriesIterator
var err error
if query.from == query.through {
its, err = q.QueryInstant(ctx, query.from, 5*time.Minute, matcher)
} else {
its, err = q.QueryRange(ctx, query.from, query.through, matcher)
}
if err != nil {
t.Fatal(err)
}
if err = q.Close(); err != nil {
t.Fatal(err)
}
out := make(model.Matrix, 0, len(query.out))
for _, it := range its {
var values []model.SamplePair
if query.from == query.through {
values = []model.SamplePair{it.ValueAtOrBeforeTime(query.from)}
} else {
values = it.RangeValues(metric.Interval{
OldestInclusive: query.from,
NewestInclusive: query.through,
})
}
it.Close()
out = append(out, &model.SampleStream{
Metric: it.Metric().Metric,
Values: values,
})
}
sort.Sort(out)
sort.Sort(query.out)
if !reflect.DeepEqual(out, query.out) {
t.Fatalf("Test case %q, query %d: Unexpected query result;\n\ngot:\n\n%s\n\nwant:\n\n%s", test.name, i, out, query.out)
}
}
}
}
func TestMetricsForLabelMatchersIgnoresRemoteData(t *testing.T) {
q := querier{
local: &testQuerier{
series: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"testlabel": "testvalue1",
},
Values: []model.SamplePair{{1, 1}},
},
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"testlabel": "testvalue2",
},
Values: []model.SamplePair{{1, 1}},
},
},
},
remotes: []local.Querier{
&testQuerier{
series: model.Matrix{
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"testlabel": "testvalue2",
},
Values: []model.SamplePair{{1, 1}},
},
&model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"testlabel": "testvalue3",
},
Values: []model.SamplePair{{1, 1}},
},
},
},
},
}
matcher, err := metric.NewLabelMatcher(metric.Equal, model.MetricNameLabel, "testmetric")
if err != nil {
t.Fatal(err)
}
got, err := q.MetricsForLabelMatchers(context.Background(), 0, 1, metric.LabelMatchers{matcher})
if err != nil {
t.Fatal(err)
}
want := []metric.Metric{
{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"testlabel": "testvalue1",
},
},
{
Metric: model.Metric{
model.MetricNameLabel: "testmetric",
"testlabel": "testvalue2",
},
},
}
if !reflect.DeepEqual(want, got) {
t.Fatalf("Unexpected metric returned;\n\nwant:\n\n%#v\n\ngot:\n\n%#v", want, got)
}
}

View file

@ -16,6 +16,7 @@ package remote
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"time" "time"
@ -26,40 +27,36 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
) )
// Client allows sending batches of Prometheus samples to an HTTP endpoint. // Client allows reading and writing from/to a remote HTTP endpoint.
type Client struct { type Client struct {
index int // Used to differentiate metrics. index int // Used to differentiate metrics.
url config.URL url *config.URL
client *http.Client client *http.Client
timeout time.Duration timeout time.Duration
} }
type clientConfig struct {
url *config.URL
timeout model.Duration
httpClientConfig config.HTTPClientConfig
}
// NewClient creates a new Client. // NewClient creates a new Client.
func NewClient(index int, conf *config.RemoteWriteConfig) (*Client, error) { func NewClient(index int, conf *clientConfig) (*Client, error) {
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig) httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// The only timeout we care about is the configured push timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(conf.ProxyURL.URL),
TLSClientConfig: tlsConfig,
}
if conf.BasicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(conf.BasicAuth.Username, conf.BasicAuth.Password, rt)
}
return &Client{ return &Client{
index: index, index: index,
url: *conf.URL, url: conf.url,
client: httputil.NewClient(rt), client: httpClient,
timeout: time.Duration(conf.RemoteTimeout), timeout: time.Duration(conf.timeout),
}, nil }, nil
} }
@ -103,8 +100,12 @@ func (c *Client) Store(samples model.Samples) error {
return err return err
} }
httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.0.1")
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
ctx, _ := context.WithTimeout(context.Background(), c.timeout)
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil { if err != nil {
return err return err
@ -120,3 +121,108 @@ func (c *Client) Store(samples model.Samples) error {
func (c Client) Name() string { func (c Client) Name() string {
return fmt.Sprintf("%d:%s", c.index, c.url) return fmt.Sprintf("%d:%s", c.index, c.url)
} }
// Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, from, through model.Time, matchers metric.LabelMatchers) (model.Matrix, error) {
req := &ReadRequest{
// TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it.
Queries: []*Query{{
StartTimestampMs: int64(from),
EndTimestampMs: int64(through),
Matchers: labelMatchersToProto(matchers),
}},
}
data, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to marshal read request: %v", err)
}
buf := bytes.Buffer{}
if _, err := snappy.NewWriter(&buf).Write(data); err != nil {
return nil, err
}
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
if err != nil {
return nil, fmt.Errorf("unable to create request: %v", err)
}
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.0.1")
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status)
}
if data, err = ioutil.ReadAll(snappy.NewReader(httpResp.Body)); err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}
var resp ReadResponse
err = proto.Unmarshal(data, &resp)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal response body: %v", err)
}
return matrixFromProto(resp.Timeseries), nil
}
func labelMatchersToProto(matchers metric.LabelMatchers) []*LabelMatcher {
pbMatchers := make([]*LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType MatchType
switch m.Type {
case metric.Equal:
mType = MatchType_EQUAL
case metric.NotEqual:
mType = MatchType_NOT_EQUAL
case metric.RegexMatch:
mType = MatchType_REGEX_MATCH
case metric.RegexNoMatch:
mType = MatchType_REGEX_NO_MATCH
default:
panic("invalid matcher type")
}
pbMatchers = append(pbMatchers, &LabelMatcher{
Type: mType,
Name: string(m.Name),
Value: string(m.Value),
})
}
return pbMatchers
}
func matrixFromProto(seriesSet []*TimeSeries) model.Matrix {
m := make(model.Matrix, 0, len(seriesSet))
for _, ts := range seriesSet {
var ss model.SampleStream
ss.Metric = labelPairsToMetric(ts.Labels)
ss.Values = make([]model.SamplePair, 0, len(ts.Samples))
for _, s := range ts.Samples {
ss.Values = append(ss.Values, model.SamplePair{
Value: model.SampleValue(s.Value),
Timestamp: model.Time(s.TimestampMs),
})
}
m = append(m, &ss)
}
return m
}
func labelPairsToMetric(labelPairs []*LabelPair) model.Metric {
metric := make(model.Metric, len(labelPairs))
for _, l := range labelPairs {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return metric
}

View file

@ -0,0 +1,64 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
)
// This is a struct and not just a renamed type because otherwise the Metric
// field and Metric() methods would clash.
type sampleStreamIterator struct {
ss *model.SampleStream
}
func (it sampleStreamIterator) Metric() metric.Metric {
return metric.Metric{Metric: it.ss.Metric}
}
func (it sampleStreamIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
// TODO: This is a naive inefficient approach - in reality, queries go mostly
// linearly through iterators, and we will want to make successive calls to
// this method more efficient by taking into account the last result index
// somehow (similarly to how it's done in Prometheus's
// memorySeriesIterators).
i := sort.Search(len(it.ss.Values), func(n int) bool {
return it.ss.Values[n].Timestamp.After(ts)
})
if i == 0 {
return model.SamplePair{Timestamp: model.Earliest}
}
return it.ss.Values[i-1]
}
func (it sampleStreamIterator) RangeValues(in metric.Interval) []model.SamplePair {
n := len(it.ss.Values)
start := sort.Search(n, func(i int) bool {
return !it.ss.Values[i].Timestamp.Before(in.OldestInclusive)
})
end := sort.Search(n, func(i int) bool {
return it.ss.Values[i].Timestamp.After(in.NewestInclusive)
})
if start == n {
return nil
}
return it.ss.Values[start:end]
}
func (it sampleStreamIterator) Close() {}

115
storage/remote/read.go Normal file
View file

@ -0,0 +1,115 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sync"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
)
// Reader allows reading from multiple remote sources.
type Reader struct {
mtx sync.Mutex
clients []*Client
}
// ApplyConfig updates the state as the new config requires.
func (r *Reader) ApplyConfig(conf *config.Config) error {
clients := []*Client{}
for i, rrConf := range conf.RemoteReadConfigs {
c, err := NewClient(i, &clientConfig{
url: rrConf.URL,
timeout: rrConf.RemoteTimeout,
httpClientConfig: rrConf.HTTPClientConfig,
})
if err != nil {
return err
}
clients = append(clients, c)
}
r.mtx.Lock()
defer r.mtx.Unlock()
r.clients = clients
return nil
}
// Queriers returns a list of Queriers for the currently configured
// remote read endpoints.
func (r *Reader) Queriers() []local.Querier {
r.mtx.Lock()
defer r.mtx.Unlock()
queriers := make([]local.Querier, 0, len(r.clients))
for _, c := range r.clients {
queriers = append(queriers, &querier{client: c})
}
return queriers
}
// querier is an adapter to make a Client usable as a promql.Querier.
type querier struct {
client *Client
}
func (q *querier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return MatrixToIterators(q.client.Read(ctx, from, through, matchers))
}
func (q *querier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return MatrixToIterators(q.client.Read(ctx, ts.Add(-stalenessDelta), ts, matchers))
}
// MatrixToIterators returns series iterators for a given matrix.
func MatrixToIterators(m model.Matrix, err error) ([]local.SeriesIterator, error) {
if err != nil {
return nil, err
}
its := make([]local.SeriesIterator, 0, len(m))
for _, ss := range m {
its = append(its, sampleStreamIterator{
ss: ss,
})
}
return its, nil
}
func (q *querier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
// TODO: Implement remote metadata querying.
return nil, nil
}
func (q *querier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
// TODO: Implement remote last sample querying.
return nil, nil
}
func (q *querier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
// TODO: Implement remote metadata querying.
return nil, nil
}
func (q *querier) Close() error {
return nil
}

View file

@ -13,6 +13,10 @@ It has these top-level messages:
LabelPair LabelPair
TimeSeries TimeSeries
WriteRequest WriteRequest
ReadRequest
ReadResponse
Query
LabelMatcher
*/ */
package remote package remote
@ -31,6 +35,33 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type MatchType int32
const (
MatchType_EQUAL MatchType = 0
MatchType_NOT_EQUAL MatchType = 1
MatchType_REGEX_MATCH MatchType = 2
MatchType_REGEX_NO_MATCH MatchType = 3
)
var MatchType_name = map[int32]string{
0: "EQUAL",
1: "NOT_EQUAL",
2: "REGEX_MATCH",
3: "REGEX_NO_MATCH",
}
var MatchType_value = map[string]int32{
"EQUAL": 0,
"NOT_EQUAL": 1,
"REGEX_MATCH": 2,
"REGEX_NO_MATCH": 3,
}
func (x MatchType) String() string {
return proto.EnumName(MatchType_name, int32(x))
}
func (MatchType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type Sample struct { type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"` Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"`
TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"` TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"`
@ -92,29 +123,106 @@ func (m *WriteRequest) GetTimeseries() []*TimeSeries {
return nil return nil
} }
type ReadRequest struct {
Queries []*Query `protobuf:"bytes,1,rep,name=queries" json:"queries,omitempty"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *ReadRequest) GetQueries() []*Query {
if m != nil {
return m.Queries
}
return nil
}
type ReadResponse struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *ReadResponse) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
type Query struct {
StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs" json:"start_timestamp_ms,omitempty"`
EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs" json:"end_timestamp_ms,omitempty"`
Matchers []*LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers,omitempty"`
}
func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *Query) GetMatchers() []*LabelMatcher {
if m != nil {
return m.Matchers
}
return nil
}
type LabelMatcher struct {
Type MatchType `protobuf:"varint,1,opt,name=type,enum=remote.MatchType" json:"type,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
Value string `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
}
func (m *LabelMatcher) Reset() { *m = LabelMatcher{} }
func (m *LabelMatcher) String() string { return proto.CompactTextString(m) }
func (*LabelMatcher) ProtoMessage() {}
func (*LabelMatcher) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func init() { func init() {
proto.RegisterType((*Sample)(nil), "remote.Sample") proto.RegisterType((*Sample)(nil), "remote.Sample")
proto.RegisterType((*LabelPair)(nil), "remote.LabelPair") proto.RegisterType((*LabelPair)(nil), "remote.LabelPair")
proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries") proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries")
proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest") proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest")
proto.RegisterType((*ReadRequest)(nil), "remote.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "remote.ReadResponse")
proto.RegisterType((*Query)(nil), "remote.Query")
proto.RegisterType((*LabelMatcher)(nil), "remote.LabelMatcher")
proto.RegisterEnum("remote.MatchType", MatchType_name, MatchType_value)
} }
func init() { proto.RegisterFile("remote.proto", fileDescriptor0) } func init() { proto.RegisterFile("remote.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 216 bytes of a gzipped FileDescriptorProto // 397 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x4c, 0x90, 0x3f, 0x4f, 0x80, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x52, 0x5d, 0x6b, 0xe2, 0x40,
0x10, 0xc5, 0x03, 0x68, 0x0d, 0x07, 0x31, 0xf1, 0xe2, 0xc0, 0xa8, 0x9d, 0x70, 0x61, 0xc0, 0xf8, 0x14, 0xdd, 0x24, 0x1a, 0x37, 0xd7, 0x98, 0xcd, 0x0e, 0x3e, 0xf8, 0xb8, 0x1b, 0x58, 0xd6, 0x5d,
0x01, 0x74, 0xd6, 0xc4, 0x14, 0x13, 0x47, 0x53, 0x92, 0x1b, 0x9a, 0xb4, 0x82, 0x6d, 0xf1, 0xf3, 0x8a, 0x14, 0x4b, 0xfb, 0x6e, 0x8b, 0xb4, 0x14, 0x3f, 0xea, 0x98, 0xd2, 0xbe, 0x85, 0xb1, 0x0e,
0x5b, 0x5a, 0xfe, 0xb8, 0xf5, 0xdd, 0xbd, 0x7b, 0xf7, 0xeb, 0x41, 0x6d, 0xc9, 0x4c, 0x9e, 0xba, 0x34, 0x90, 0x98, 0x34, 0x33, 0x16, 0xfc, 0x19, 0xfd, 0xc7, 0x9d, 0xcc, 0xe4, 0x4b, 0xf0, 0xa9,
0xd9, 0x4e, 0x7e, 0x42, 0x96, 0x14, 0x7f, 0x06, 0x36, 0x48, 0x33, 0x6b, 0xc2, 0x5b, 0xb8, 0xfc, 0x6f, 0xb9, 0xf7, 0x9c, 0x7b, 0xee, 0xc9, 0x9c, 0x0b, 0x76, 0x46, 0xe3, 0x84, 0xd3, 0x51, 0x9a,
0x95, 0x7a, 0xa1, 0x26, 0xbb, 0xcb, 0xda, 0x4c, 0x24, 0x81, 0xf7, 0x50, 0x7b, 0x65, 0xc8, 0xf9, 0x25, 0x3c, 0x41, 0xa6, 0xaa, 0xbc, 0x09, 0x98, 0x6b, 0x12, 0xa7, 0x11, 0x45, 0x7d, 0x68, 0xbf,
0x60, 0xfa, 0x32, 0xae, 0xc9, 0x43, 0xb3, 0x10, 0xd5, 0x51, 0x7b, 0x73, 0xfc, 0x09, 0xca, 0x57, 0x93, 0x68, 0x4f, 0x07, 0xda, 0x2f, 0x6d, 0xa8, 0x61, 0x55, 0xa0, 0xdf, 0x60, 0xf3, 0x30, 0xa6,
0x39, 0x92, 0x7e, 0x97, 0xca, 0x22, 0xc2, 0xc5, 0xb7, 0x34, 0x29, 0xa4, 0x14, 0xf1, 0x7d, 0x26, 0x8c, 0x0b, 0x52, 0x10, 0xb3, 0x81, 0x2e, 0x40, 0x03, 0x77, 0xab, 0xde, 0x9c, 0x79, 0x97, 0x60,
0xe7, 0xb1, 0x98, 0x04, 0x97, 0x00, 0x1f, 0x21, 0x65, 0x20, 0xab, 0xc8, 0xe1, 0x03, 0x30, 0xbd, 0xcd, 0xc8, 0x86, 0x46, 0x0f, 0x24, 0xcc, 0x10, 0x82, 0xd6, 0x8e, 0xc4, 0x4a, 0xc4, 0xc2, 0xf2,
0x86, 0xb8, 0x30, 0x59, 0xb4, 0x55, 0x7f, 0xd3, 0x6d, 0xb8, 0x47, 0xb4, 0xd8, 0x0c, 0xd8, 0xc2, 0xbb, 0x56, 0xd6, 0x65, 0x53, 0x15, 0x1e, 0x01, 0xf0, 0x85, 0xca, 0x9a, 0x66, 0x21, 0x65, 0xe8,
0x95, 0x8b, 0xc8, 0x2b, 0xcd, 0xea, 0xbd, 0xde, 0xbd, 0xe9, 0x27, 0x62, 0x6f, 0xf3, 0x17, 0xa8, 0x1f, 0x98, 0x51, 0x2e, 0xc2, 0xc4, 0xa4, 0x31, 0xec, 0x8e, 0x7f, 0x8e, 0x0a, 0xbb, 0x95, 0x34,
0x3f, 0xad, 0xf2, 0x24, 0xe8, 0x67, 0x09, 0xb8, 0xd8, 0x03, 0x44, 0xf0, 0xb8, 0x72, 0x5b, 0x84, 0x2e, 0x08, 0x68, 0x08, 0x1d, 0x26, 0x2d, 0xe7, 0x6e, 0x72, 0xae, 0x53, 0x72, 0xd5, 0x9f, 0xe0,
0xfb, 0xf0, 0x09, 0x23, 0xfe, 0xb9, 0x46, 0x16, 0xef, 0xf5, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff, 0x12, 0xf6, 0xae, 0xc1, 0x7e, 0xca, 0x42, 0x4e, 0x31, 0x7d, 0xdb, 0x0b, 0xbb, 0x68, 0x0c, 0x20,
0x73, 0xb4, 0xd1, 0xb6, 0x3f, 0x01, 0x00, 0x00, 0x8d, 0xcb, 0x95, 0xc5, 0x22, 0x54, 0x0e, 0xd7, 0x66, 0x70, 0x83, 0xe5, 0x5d, 0x41, 0x17, 0x53,
0xb2, 0x2d, 0x25, 0xfe, 0x42, 0x47, 0x7c, 0x34, 0xe6, 0x7b, 0xe5, 0xfc, 0x4a, 0xb4, 0x0f, 0xb8,
0x44, 0xf3, 0xdd, 0x6a, 0x8e, 0xa5, 0xc9, 0x8e, 0xd1, 0x2f, 0xed, 0xfe, 0xd0, 0xa0, 0x2d, 0x65,
0xd1, 0x19, 0x20, 0xf1, 0xdc, 0x19, 0x0f, 0x8e, 0xc2, 0xd0, 0x64, 0x18, 0xae, 0x44, 0xfc, 0x3a,
0x11, 0xf1, 0x42, 0x2e, 0xdd, 0x6d, 0x83, 0x13, 0xc1, 0x39, 0xa2, 0xdf, 0x64, 0x9e, 0xc3, 0xf7,
0x98, 0xf0, 0x97, 0x57, 0x9a, 0xb1, 0x81, 0x21, 0x3d, 0xf5, 0x8f, 0x1e, 0x7e, 0xae, 0x40, 0x5c,
0xb1, 0xbc, 0x00, 0xec, 0x26, 0x82, 0xfe, 0x40, 0x8b, 0x1f, 0x52, 0x15, 0xb8, 0x53, 0xc7, 0x26,
0x61, 0x5f, 0x00, 0x58, 0xc2, 0xd5, 0x5d, 0xe8, 0xa7, 0xee, 0xc2, 0x68, 0xdc, 0xc5, 0xff, 0x7b,
0xb0, 0xaa, 0x61, 0x64, 0x41, 0x7b, 0xba, 0x7a, 0x9c, 0xcc, 0xdc, 0x6f, 0xa8, 0x07, 0xd6, 0x62,
0xe9, 0x07, 0xaa, 0xd4, 0xd0, 0x0f, 0x91, 0xcb, 0xf4, 0x76, 0xfa, 0x1c, 0xcc, 0x27, 0xfe, 0xcd,
0x9d, 0xab, 0x8b, 0x0d, 0x8e, 0x6a, 0x2c, 0x96, 0x45, 0xcf, 0xd8, 0x98, 0xf2, 0xd8, 0x2f, 0x3e,
0x03, 0x00, 0x00, 0xff, 0xff, 0x24, 0x79, 0x44, 0x11, 0xfc, 0x02, 0x00, 0x00,
} }

View file

@ -34,3 +34,30 @@ message TimeSeries {
message WriteRequest { message WriteRequest {
repeated TimeSeries timeseries = 1; repeated TimeSeries timeseries = 1;
} }
message ReadRequest {
repeated Query queries = 1;
}
message ReadResponse {
repeated TimeSeries timeseries = 1;
}
message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated LabelMatcher matchers = 3;
}
enum MatchType {
EQUAL = 0;
NOT_EQUAL = 1;
REGEX_MATCH = 2;
REGEX_NO_MATCH = 3;
}
message LabelMatcher {
MatchType type = 1;
string name = 2;
string value = 3;
}

View file

@ -21,22 +21,26 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
// Storage allows queueing samples for remote writes. // Writer allows queueing samples for remote writes.
type Storage struct { type Writer struct {
mtx sync.RWMutex mtx sync.RWMutex
queues []*QueueManager queues []*QueueManager
} }
// ApplyConfig updates the state as the new config requires. // ApplyConfig updates the state as the new config requires.
func (s *Storage) ApplyConfig(conf *config.Config) error { func (w *Writer) ApplyConfig(conf *config.Config) error {
s.mtx.Lock() w.mtx.Lock()
defer s.mtx.Unlock() defer w.mtx.Unlock()
newQueues := []*QueueManager{} newQueues := []*QueueManager{}
// TODO: we should only stop & recreate queues which have changes, // TODO: we should only stop & recreate queues which have changes,
// as this can be quite disruptive. // as this can be quite disruptive.
for i, rwConf := range conf.RemoteWriteConfigs { for i, rwConf := range conf.RemoteWriteConfigs {
c, err := NewClient(i, rwConf) c, err := NewClient(i, &clientConfig{
url: rwConf.URL,
timeout: rwConf.RemoteTimeout,
httpClientConfig: rwConf.HTTPClientConfig,
})
if err != nil { if err != nil {
return err return err
} }
@ -47,30 +51,30 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
})) }))
} }
for _, q := range s.queues { for _, q := range w.queues {
q.Stop() q.Stop()
} }
s.queues = newQueues w.queues = newQueues
for _, q := range s.queues { for _, q := range w.queues {
q.Start() q.Start()
} }
return nil return nil
} }
// Stop the background processing of the storage queues. // Stop the background processing of the storage queues.
func (s *Storage) Stop() { func (w *Writer) Stop() {
for _, q := range s.queues { for _, q := range w.queues {
q.Stop() q.Stop()
} }
} }
// Append implements storage.SampleAppender. Always returns nil. // Append implements storage.SampleAppender. Always returns nil.
func (s *Storage) Append(smpl *model.Sample) error { func (w *Writer) Append(smpl *model.Sample) error {
s.mtx.RLock() w.mtx.RLock()
defer s.mtx.RUnlock() defer w.mtx.RUnlock()
for _, q := range s.queues { for _, q := range w.queues {
q.Append(smpl) q.Append(smpl)
} }
return nil return nil
@ -79,6 +83,6 @@ func (s *Storage) Append(smpl *model.Sample) error {
// NeedsThrottling implements storage.SampleAppender. It will always return // NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead // false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling. // of asking for throttling.
func (s *Storage) NeedsThrottling() bool { func (w *Writer) NeedsThrottling() bool {
return false return false
} }

View file

@ -21,6 +21,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"time" "time"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -31,10 +32,42 @@ func NewClient(rt http.RoundTripper) *http.Client {
return &http.Client{Transport: rt} return &http.Client{Transport: rt}
} }
// NewDeadlineClient returns a new http.Client which will time out long running // NewClientFromConfig returns a new HTTP client configured for the
// requests. // given config.HTTPClientConfig.
func NewDeadlineClient(timeout time.Duration, proxyURL *url.URL) *http.Client { func NewClientFromConfig(cfg config.HTTPClientConfig) (*http.Client, error) {
return NewClient(NewDeadlineRoundTripper(timeout, proxyURL)) tlsConfig, err := NewTLSConfig(cfg.TLSConfig)
if err != nil {
return nil, err
}
// The only timeout we care about is the configured scrape timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(cfg.ProxyURL.URL),
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
}
// If a bearer token is provided, create a round tripper that will set the
// Authorization header correctly on each request.
bearerToken := cfg.BearerToken
if len(bearerToken) == 0 && len(cfg.BearerTokenFile) > 0 {
b, err := ioutil.ReadFile(cfg.BearerTokenFile)
if err != nil {
return nil, fmt.Errorf("unable to read bearer token file %s: %s", cfg.BearerTokenFile, err)
}
bearerToken = strings.TrimSpace(string(b))
}
if len(bearerToken) > 0 {
rt = NewBearerAuthRoundTripper(bearerToken, rt)
}
if cfg.BasicAuth != nil {
rt = NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, rt)
}
// Return a new client with the configured round tripper.
return NewClient(rt), nil
} }
// NewDeadlineRoundTripper returns a new http.RoundTripper which will time out // NewDeadlineRoundTripper returns a new http.RoundTripper which will time out
@ -119,6 +152,7 @@ func cloneRequest(r *http.Request) *http.Request {
return r2 return r2
} }
// NewTLSConfig creates a new tls.Config from the given config.TLSConfig.
func NewTLSConfig(cfg config.TLSConfig) (*tls.Config, error) { func NewTLSConfig(cfg config.TLSConfig) (*tls.Config, error) {
tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify} tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify}