From bf4a279a91a668b1e26d6efc351562fa17ff6563 Mon Sep 17 00:00:00 2001 From: Thibault Chataigner Date: Wed, 18 Oct 2017 13:08:14 +0200 Subject: [PATCH] Remote storage reads based on oldest timestamp in primary storage (#3129) Currently all read queries are simply pushed to remote read clients. This is fine, except for remote storage for wich it unefficient and make query slower even if remote read is unnecessary. So we need instead to compare the oldest timestamp in primary/local storage with the query range lower boundary. If the oldest timestamp is older than the mint parameter, then there is no need for remote read. This is an optionnal behavior per remote read client. Signed-off-by: Thibault Chataigner --- cmd/prometheus/main.go | 5 ++- config/config.go | 3 +- config/config_test.go | 13 +++++++ config/testdata/conf.good.yml | 6 +++ storage/fanout.go | 22 +++++++++++ storage/interface.go | 3 ++ storage/remote/client.go | 19 ++++++---- storage/remote/read.go | 22 ++++++++++- storage/remote/read_test.go | 58 ++++++++++++++++++++++++++++ storage/remote/storage.go | 19 ++++++++-- storage/tsdb/tsdb.go | 71 ++++++++++++++++++++++++++++++++--- util/testutil/storage.go | 2 +- 12 files changed, 220 insertions(+), 23 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index caabeffb0c..8a28ed6db7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -225,7 +225,7 @@ func main() { var ( localStorage = &tsdb.ReadyStorage{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote")) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) @@ -326,7 +326,8 @@ func main() { } level.Info(logger).Log("msg", "TSDB started") - localStorage.Set(db) + startTimeMargin := int64(time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) + localStorage.Set(db, startTimeMargin) }() prometheus.MustRegister(configSuccess) diff --git a/config/config.go b/config/config.go index cc5eaefe6c..8a295b8961 100644 --- a/config/config.go +++ b/config/config.go @@ -197,6 +197,7 @@ var ( // DefaultRemoteReadConfig is the default remote read configuration. DefaultRemoteReadConfig = RemoteReadConfig{ RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: true, } ) @@ -1490,7 +1491,7 @@ type QueueConfig struct { type RemoteReadConfig struct { URL *URL `yaml:"url"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` - + ReadRecent bool `yaml:"read_recent,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. HTTPClientConfig HTTPClientConfig `yaml:",inline"` diff --git a/config/config_test.go b/config/config_test.go index 5f1a8d74ea..8ae364e28a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -75,6 +75,19 @@ var expectedConf = &Config{ }, }, + RemoteReadConfigs: []*RemoteReadConfig{ + { + URL: mustParseURL("http://remote1/read"), + RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: true, + }, + { + URL: mustParseURL("http://remote3/read"), + RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: false, + }, + }, + ScrapeConfigs: []*ScrapeConfig{ { JobName: "prometheus", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 34da77d4ed..a65a401ac9 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -20,6 +20,12 @@ remote_write: action: drop - url: http://remote2/push +remote_read: + - url: http://remote1/read + read_recent: true + - url: http://remote3/read + read_recent: false + scrape_configs: - job_name: prometheus diff --git a/storage/fanout.go b/storage/fanout.go index b5743f1db4..2bc4323a74 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -40,6 +41,27 @@ func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Stora } } +// StartTime implements the Storage interface. +func (f *fanout) StartTime() (int64, error) { + // StartTime of a fanout should be the earliest StartTime of all its storages, + // both primary and secondaries. + firstTime, err := f.primary.StartTime() + if err != nil { + return int64(model.Latest), err + } + + for _, storage := range f.secondaries { + t, err := storage.StartTime() + if err != nil { + return int64(model.Latest), err + } + if t < firstTime { + firstTime = t + } + } + return firstTime, nil +} + func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { queriers := mergeQuerier{ queriers: make([]Querier, 0, 1+len(f.secondaries)), diff --git a/storage/interface.go b/storage/interface.go index 5eacb9f584..2b21792520 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -31,6 +31,9 @@ var ( // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { + // StartTime returns the oldest timestamp stored in the storage. + StartTime() (int64, error) + // Querier returns a new Querier on the storage. Querier(ctx context.Context, mint, maxt int64) (Querier, error) diff --git a/storage/remote/client.go b/storage/remote/client.go index f96181ce0d..7e4b23206c 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -37,15 +37,17 @@ const maxErrMsgLen = 256 // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - index int // Used to differentiate metrics. - url *config.URL - client *http.Client - timeout time.Duration + index int // Used to differentiate metrics. + url *config.URL + client *http.Client + timeout time.Duration + readRecent bool } type clientConfig struct { url *config.URL timeout model.Duration + readRecent bool httpClientConfig config.HTTPClientConfig } @@ -57,10 +59,11 @@ func NewClient(index int, conf *clientConfig) (*Client, error) { } return &Client{ - index: index, - url: conf.url, - client: httpClient, - timeout: time.Duration(conf.timeout), + index: index, + url: conf.url, + client: httpClient, + timeout: time.Duration(conf.timeout), + readRecent: conf.readRecent, }, nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index 1a88e6685c..8474a4236e 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -30,17 +30,35 @@ func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier, defer r.mtx.Unlock() queriers := make([]storage.Querier, 0, len(r.clients)) + localStartTime, err := r.localStartTimeCallback() + if err != nil { + return nil, err + } for _, c := range r.clients { + cmaxt := maxt + if !c.readRecent { + // Avoid queries whose timerange is later than the first timestamp in local DB. + if mint > localStartTime { + continue + } + // Query only samples older than the first timestamp in local DB. + if maxt > localStartTime { + cmaxt = localStartTime + } + } queriers = append(queriers, &querier{ mint: mint, - maxt: maxt, + maxt: cmaxt, client: c, externalLabels: r.externalLabels, }) } - return storage.NewMergeQuerier(queriers), nil + return newMergeQueriers(queriers), nil } +// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed. +var newMergeQueriers = storage.NewMergeQuerier + // Querier is an adapter to make a Client usable as a storage.Querier. type querier struct { mint, maxt int64 diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 6b71afcdbb..cf50a0c5d3 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -17,8 +17,10 @@ import ( "reflect" "sort" "testing" + "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -123,3 +125,59 @@ func TestConcreteSeriesSet(t *testing.T) { t.Fatalf("Expected Next() to be false.") } } + +type mockMergeQuerier struct{ queriersCount int } + +func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil } +func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil } +func (*mockMergeQuerier) Close() error { return nil } + +func TestRemoteStorageQuerier(t *testing.T) { + tests := []struct { + localStartTime int64 + readRecentClients []bool + mint int64 + maxt int64 + expectedQueriersCount int + }{ + { + localStartTime: int64(20), + readRecentClients: []bool{true, true, false}, + mint: int64(0), + maxt: int64(50), + expectedQueriersCount: 3, + }, + { + localStartTime: int64(20), + readRecentClients: []bool{true, true, false}, + mint: int64(30), + maxt: int64(50), + expectedQueriersCount: 2, + }, + } + + for i, test := range tests { + s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil }) + s.clients = []*Client{} + for _, readRecent := range test.readRecentClients { + c, _ := NewClient(0, &clientConfig{ + url: nil, + timeout: model.Duration(30 * time.Second), + httpClientConfig: config.HTTPClientConfig{}, + readRecent: readRecent, + }) + s.clients = append(s.clients, c) + } + // overrides mergeQuerier to mockMergeQuerier so we can reflect its type + newMergeQueriers = func(queriers []storage.Querier) storage.Querier { + return &mockMergeQuerier{queriersCount: len(queriers)} + } + + querier, _ := s.Querier(nil, test.mint, test.maxt) + actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount + + if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) { + t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount) + } + } +} diff --git a/storage/remote/storage.go b/storage/remote/storage.go index f11ddb4740..65f3f2d19d 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -21,6 +21,9 @@ import ( "github.com/prometheus/prometheus/config" ) +// Callback func that return the oldest timestamp stored in a storage. +type startTimeCallback func() (int64, error) + // Storage represents all the remote read and write endpoints. It implements // storage.Storage. type Storage struct { @@ -31,15 +34,17 @@ type Storage struct { queues []*QueueManager // For reads - clients []*Client - externalLabels model.LabelSet + clients []*Client + localStartTimeCallback startTimeCallback + externalLabels model.LabelSet } -func NewStorage(l log.Logger) *Storage { +// NewStorage returns a remote.Storage. +func NewStorage(l log.Logger, stCallback startTimeCallback) *Storage { if l == nil { l = log.NewNopLogger() } - return &Storage{logger: l} + return &Storage{logger: l, localStartTimeCallback: stCallback} } // ApplyConfig updates the state as the new config requires. @@ -87,6 +92,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { url: rrConf.URL, timeout: rrConf.RemoteTimeout, httpClientConfig: rrConf.HTTPClientConfig, + readRecent: rrConf.ReadRecent, }) if err != nil { return err @@ -100,6 +106,11 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { return nil } +// StartTime implements the Storage interface. +func (s *Storage) StartTime() (int64, error) { + return int64(model.Latest), nil +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock() diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 6be55c0c58..e580d0a65e 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -40,11 +40,11 @@ type ReadyStorage struct { } // Set the storage. -func (s *ReadyStorage) Set(db *tsdb.DB) { +func (s *ReadyStorage) Set(db *tsdb.DB, startTimeMargin int64) { s.mtx.Lock() defer s.mtx.Unlock() - s.a = &adapter{db: db} + s.a = &adapter{db: db, startTimeMargin: startTimeMargin} } // Get the storage. @@ -62,6 +62,14 @@ func (s *ReadyStorage) get() *adapter { return x } +// StartTime implements the Storage interface. +func (s *ReadyStorage) StartTime() (int64, error) { + if x := s.get(); x != nil { + return x.StartTime() + } + return int64(model.Latest), ErrNotReady +} + // Querier implements the Storage interface. func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { if x := s.get(); x != nil { @@ -86,13 +94,15 @@ func (s *ReadyStorage) Close() error { return nil } -func Adapter(db *tsdb.DB) storage.Storage { - return &adapter{db: db} +// Adapter return an adapter as storage.Storage. +func Adapter(db *tsdb.DB, startTimeMargin int64) storage.Storage { + return &adapter{db: db, startTimeMargin: startTimeMargin} } // adapter implements a storage.Storage around TSDB. type adapter struct { - db *tsdb.DB + db *tsdb.DB + startTimeMargin int64 } // Options of the DB storage. @@ -139,6 +149,57 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t return db, nil } +// StartTime implements the Storage interface. +func (a adapter) StartTime() (int64, error) { + startTime := int64(model.Latest) + + var indexr tsdb.IndexReader + if len(a.db.Blocks()) > 0 { + var err error + indexr, err = a.db.Blocks()[0].Index() + if err != nil { + return startTime, err + } + } else { + var err error + indexr, err = a.db.Head().Index() + if err != nil { + return startTime, err + } + } + + joblabel := "job" + tpls, err := indexr.LabelValues(joblabel) + if err != nil { + return startTime, err + } + + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) + if err != nil { + continue + } + + for _, v := range vals { + p, err := indexr.Postings(joblabel, v) + if err != nil { + continue + } + + if p.Next() { + var lset tsdbLabels.Labels + var chks []tsdb.ChunkMeta + indexr.Series(p.At(), &lset, &chks) + if startTime > chks[0].MinTime { + startTime = chks[0].MinTime + } + } + } + } + // Add a safety margin as it may take a few minutes for everything to spin up. + return startTime + a.startTimeMargin, nil +} + func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { q, err := a.db.Querier(mint, maxt) if err != nil { diff --git a/util/testutil/storage.go b/util/testutil/storage.go index 46faa9dff4..246d6190d7 100644 --- a/util/testutil/storage.go +++ b/util/testutil/storage.go @@ -40,7 +40,7 @@ func NewStorage(t T) storage.Storage { if err != nil { t.Fatalf("Opening test storage failed: %s", err) } - return testStorage{Storage: tsdb.Adapter(db), dir: dir} + return testStorage{Storage: tsdb.Adapter(db, int64(0)), dir: dir} } type testStorage struct {