Refactor remote storage querier handling

* Decouple remote client from ReadRecent feature.
* Separate remote read filter into a small, testable function.
* Use storage.Queryable interface to compose independent
  functionalities.
This commit is contained in:
Tobias Schmidt 2017-11-12 02:15:27 +01:00
parent 9b0091d487
commit 434f0374f7
5 changed files with 175 additions and 123 deletions

View file

@ -37,18 +37,16 @@ const maxErrMsgLen = 256
// Client allows reading and writing from/to a remote HTTP endpoint. // Client allows reading and writing from/to a remote HTTP endpoint.
type Client struct { type Client struct {
index int // Used to differentiate metrics. index int // Used to differentiate clients in metrics.
url *config.URL url *config.URL
client *http.Client client *http.Client
timeout time.Duration timeout time.Duration
readRecent bool
} }
// ClientConfig configures a Client. // ClientConfig configures a Client.
type ClientConfig struct { type ClientConfig struct {
URL *config.URL URL *config.URL
Timeout model.Duration Timeout model.Duration
ReadRecent bool
HTTPClientConfig config.HTTPClientConfig HTTPClientConfig config.HTTPClientConfig
} }
@ -60,11 +58,10 @@ func NewClient(index int, conf *ClientConfig) (*Client, error) {
} }
return &Client{ return &Client{
index: index, index: index,
url: conf.URL, url: conf.URL,
client: httpClient, client: httpClient,
timeout: time.Duration(conf.Timeout), timeout: time.Duration(conf.Timeout),
readRecent: conf.ReadRecent,
}, nil }, nil
} }

View file

@ -158,6 +158,12 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
} }
} }
type byLabel []storage.Series
func (a byLabel) Len() int { return len(a) }
func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
// errSeriesSet implements storage.SeriesSet, just returning an error. // errSeriesSet implements storage.SeriesSet, just returning an error.
type errSeriesSet struct { type errSeriesSet struct {
err error err error

View file

@ -21,55 +21,30 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
// Querier returns a new Querier on the storage. // QueryableClient returns a storage.Queryable which queries the given
func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { // Client to select series sets.
r.mtx.Lock() func QueryableClient(c *Client) storage.Queryable {
defer r.mtx.Unlock() return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querier{
queriers := make([]storage.Querier, 0, len(r.clients)) ctx: ctx,
localStartTime, err := r.localStartTimeCallback() mint: mint,
if err != nil { maxt: maxt,
return nil, err client: c,
} }, nil
for _, c := range r.clients { })
cmaxt := maxt
if !c.readRecent {
// Avoid queries whose timerange is later than the first timestamp in local DB.
if mint > localStartTime {
continue
}
// Query only samples older than the first timestamp in local DB.
if maxt > localStartTime {
cmaxt = localStartTime
}
}
queriers = append(queriers, &querier{
ctx: ctx,
mint: mint,
maxt: cmaxt,
client: c,
externalLabels: r.externalLabels,
})
}
return newMergeQueriers(queriers), nil
} }
// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed. // querier is an adapter to make a Client usable as a storage.Querier.
var newMergeQueriers = storage.NewMergeQuerier
// Querier is an adapter to make a Client usable as a storage.Querier.
type querier struct { type querier struct {
ctx context.Context ctx context.Context
mint, maxt int64 mint, maxt int64
client *Client client *Client
externalLabels model.LabelSet
} }
// Select returns a set of series that matches the given label matchers. // Select implements storage.Querier and uses the given matchers to read series
// sets from the Client.
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
m, added := q.addExternalLabels(matchers) query, err := ToQuery(q.mint, q.maxt, matchers)
query, err := ToQuery(q.mint, q.maxt, m)
if err != nil { if err != nil {
return errSeriesSet{err: err} return errSeriesSet{err: err}
} }
@ -79,40 +54,83 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
return errSeriesSet{err: err} return errSeriesSet{err: err}
} }
seriesSet := FromQueryResult(res) return FromQueryResult(res)
return newSeriesSetFilter(seriesSet, added)
} }
type byLabel []storage.Series // LabelValues implements storage.Querier and is a noop.
func (a byLabel) Len() int { return len(a) }
func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) { func (q *querier) LabelValues(name string) ([]string, error) {
// TODO implement? // TODO implement?
return nil, nil return nil, nil
} }
// Close releases the resources of the Querier. // Close implements storage.Querier and is a noop.
func (q *querier) Close() error { func (q *querier) Close() error {
return nil return nil
} }
// ExternablLabelsHandler returns a storage.Queryable which creates a
// externalLabelsQuerier.
func ExternablLabelsHandler(next storage.Queryable, externalLabels model.LabelSet) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := next.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil
})
}
// externalLabelsQuerier is a querier which ensures that Select() results match
// the configured external labels.
type externalLabelsQuerier struct {
storage.Querier
externalLabels model.LabelSet
}
// Select adds equality matchers for all external labels to the list of matchers
// before calling the wrapped storage.Queryable. The added external labels are
// removed from the returned series sets.
func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
m, added := q.addExternalLabels(matchers)
s := q.Querier.Select(m...)
return newSeriesSetFilter(s, added)
}
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier
// if requested timeframe can be answered completely by the local TSDB, and
// reduces maxt if the timeframe can be partially answered by TSDB.
func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
localStartTime, err := cb()
if err != nil {
return nil, err
}
cmaxt := maxt
// Avoid queries whose timerange is later than the first timestamp in local DB.
if mint > localStartTime {
return storage.NoopQuerier(), nil
}
// Query only samples older than the first timestamp in local DB.
if maxt > localStartTime {
cmaxt = localStartTime
}
return next.Querier(ctx, mint, cmaxt)
})
}
// addExternalLabels adds matchers for each external label. External labels // addExternalLabels adds matchers for each external label. External labels
// that already have a corresponding user-supplied matcher are skipped, as we // that already have a corresponding user-supplied matcher are skipped, as we
// assume that the user explicitly wants to select a different value for them. // assume that the user explicitly wants to select a different value for them.
// We return the new set of matchers, along with a map of labels for which // We return the new set of matchers, along with a map of labels for which
// matchers were added, so that these can later be removed from the result // matchers were added, so that these can later be removed from the result
// time series again. // time series again.
func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) {
el := make(model.LabelSet, len(q.externalLabels)) el := make(model.LabelSet, len(q.externalLabels))
for k, v := range q.externalLabels { for k, v := range q.externalLabels {
el[k] = v el[k] = v
} }
for _, m := range matchers { for _, m := range ms {
if _, ok := el[model.LabelName(m.Name)]; ok { if _, ok := el[model.LabelName(m.Name)]; ok {
delete(el, model.LabelName(m.Name)) delete(el, model.LabelName(m.Name))
} }
@ -122,9 +140,9 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match
if err != nil { if err != nil {
panic(err) panic(err)
} }
matchers = append(matchers, m) ms = append(ms, m)
} }
return matchers, el return ms, el
} }
func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet { func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet {

View file

@ -18,10 +18,8 @@ import (
"reflect" "reflect"
"sort" "sort"
"testing" "testing"
"time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
@ -35,7 +33,22 @@ func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher
return m return m
} }
func TestAddExternalLabels(t *testing.T) { func TestExternalLabelsQuerierSelect(t *testing.T) {
matchers := []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
}
q := &externalLabelsQuerier{
Querier: mockQuerier{},
externalLabels: model.LabelSet{"region": "europe"},
}
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
if have := q.Select(matchers...); !reflect.DeepEqual(want, have) {
t.Errorf("expected series set %+v, got %+v", want, have)
}
}
func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
tests := []struct { tests := []struct {
el model.LabelSet el model.LabelSet
inMatchers []*labels.Matcher inMatchers []*labels.Matcher
@ -80,10 +93,7 @@ func TestAddExternalLabels(t *testing.T) {
} }
for i, test := range tests { for i, test := range tests {
q := querier{ q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el}
externalLabels: test.el,
}
matchers, added := q.addExternalLabels(test.inMatchers) matchers, added := q.addExternalLabels(test.inMatchers)
sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name })
@ -133,58 +143,57 @@ func TestSeriesSetFilter(t *testing.T) {
} }
} }
type mockMergeQuerier struct{ queriersCount int } type testQuerier struct {
ctx context.Context
mint, maxt int64
func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil } storage.Querier
func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil } }
func (*mockMergeQuerier) Close() error { return nil }
func TestPreferLocalStorageFilter(t *testing.T) {
ctx := context.Background()
func TestRemoteStorageQuerier(t *testing.T) {
tests := []struct { tests := []struct {
localStartTime int64 localStartTime int64
readRecentClients []bool mint int64
mint int64 maxt int64
maxt int64 querier storage.Querier
expectedQueriersCount int
}{ }{
{ {
localStartTime: int64(20), localStartTime: int64(100),
readRecentClients: []bool{true, true, false}, mint: int64(0),
mint: int64(0), maxt: int64(50),
maxt: int64(50), querier: testQuerier{ctx: ctx, mint: 0, maxt: 50},
expectedQueriersCount: 3,
}, },
{ {
localStartTime: int64(20), localStartTime: int64(20),
readRecentClients: []bool{true, true, false}, mint: int64(0),
mint: int64(30), maxt: int64(50),
maxt: int64(50), querier: testQuerier{ctx: ctx, mint: 0, maxt: 20},
expectedQueriersCount: 2, },
{
localStartTime: int64(20),
mint: int64(30),
maxt: int64(50),
querier: storage.NoopQuerier(),
}, },
} }
for i, test := range tests { for i, test := range tests {
s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil }) f := PreferLocalStorageFilter(
s.clients = []*Client{} storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
for _, readRecent := range test.readRecentClients { return testQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
c, _ := NewClient(0, &ClientConfig{ }),
URL: nil, func() (int64, error) { return test.localStartTime, nil },
Timeout: model.Duration(30 * time.Second), )
HTTPClientConfig: config.HTTPClientConfig{},
ReadRecent: readRecent, q, err := f.Querier(ctx, test.mint, test.maxt)
}) if err != nil {
s.clients = append(s.clients, c) t.Fatal(err)
}
// overrides mergeQuerier to mockMergeQuerier so we can reflect its type
newMergeQueriers = func(queriers []storage.Querier) storage.Querier {
return &mockMergeQuerier{queriersCount: len(queriers)}
} }
querier, _ := s.Querier(context.Background(), test.mint, test.maxt) if test.querier != q {
actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount t.Errorf("%d. expected quierer %+v, got %+v", i, test.querier, q)
if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) {
t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount)
} }
} }
} }

View file

@ -14,11 +14,13 @@
package remote package remote
import ( import (
"context"
"sync" "sync"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
) )
// Callback func that return the oldest timestamp stored in a storage. // Callback func that return the oldest timestamp stored in a storage.
@ -34,9 +36,8 @@ type Storage struct {
queues []*QueueManager queues []*QueueManager
// For reads // For reads
clients []*Client queryables []storage.Queryable
localStartTimeCallback startTimeCallback localStartTimeCallback startTimeCallback
externalLabels model.LabelSet
} }
// NewStorage returns a remote.Storage. // NewStorage returns a remote.Storage.
@ -86,22 +87,25 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
// Update read clients // Update read clients
clients := []*Client{} s.queryables = make([]storage.Queryable, 0, len(conf.RemoteReadConfigs))
for i, rrConf := range conf.RemoteReadConfigs { for i, rrConf := range conf.RemoteReadConfigs {
c, err := NewClient(i, &ClientConfig{ c, err := NewClient(i, &ClientConfig{
URL: rrConf.URL, URL: rrConf.URL,
Timeout: rrConf.RemoteTimeout, Timeout: rrConf.RemoteTimeout,
HTTPClientConfig: rrConf.HTTPClientConfig, HTTPClientConfig: rrConf.HTTPClientConfig,
ReadRecent: rrConf.ReadRecent,
}) })
if err != nil { if err != nil {
return err return err
} }
clients = append(clients, c)
}
s.clients = clients var q storage.Queryable
s.externalLabels = conf.GlobalConfig.ExternalLabels q = QueryableClient(c)
q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels)
if !rrConf.ReadRecent {
q = PreferLocalStorageFilter(q, s.localStartTimeCallback)
}
s.queryables = append(s.queryables, q)
}
return nil return nil
} }
@ -111,6 +115,24 @@ func (s *Storage) StartTime() (int64, error) {
return int64(model.Latest), nil return int64(model.Latest), nil
} }
// Querier returns a storage.MergeQuerier combining the remote client queriers
// of each configured remote read endpoint.
func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
s.mtx.Lock()
queryables := s.queryables
s.mtx.Unlock()
queriers := make([]storage.Querier, 0, len(queryables))
for _, queryable := range queryables {
q, err := queryable.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
queriers = append(queriers, q)
}
return storage.NewMergeQuerier(queriers), nil
}
// Close the background processing of the storage queues. // Close the background processing of the storage queues.
func (s *Storage) Close() error { func (s *Storage) Close() error {
s.mtx.Lock() s.mtx.Lock()